From ed41668eee28350183ceda29daf56c3ac7fa78ed Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 24 Jun 2024 07:57:22 -0700 Subject: [PATCH 01/16] Add test of interoperability of cuDF and arrow BYTE_STREAM_SPLIT encoders (#15832) BYTE_STREAM_SPLIT encoding was recently added to cuDF (#15311). The Parquet specification was recently changed (https://github.com/apache/parquet-format/pull/229) to extend the datatypes that can be encoded as BYTE_STREAM_SPLIT, and this was only recently implemented in arrow (https://github.com/apache/arrow/pull/40094). This PR adds a check that cuDF and arrow can produce compatible files using BYTE_STREAM_SPLIT encoding. Authors: - Ed Seidl (https://github.com/etseidl) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/15832 --- python/cudf/cudf/tests/test_parquet.py | 55 ++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 2596fe8cd37..af79f361b43 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2947,6 +2947,61 @@ def test_per_column_options_string_col(tmpdir, encoding): assert encoding in fmd.row_group(0).column(0).encodings +@pytest.mark.parametrize( + "num_rows", + [200, 10000], +) +def test_parquet_bss_round_trip(tmpdir, num_rows): + def flba(i): + hasher = hashlib.sha256() + hasher.update(i.to_bytes(4, "little")) + return hasher.digest() + + # use pyarrow to write table of types that support BYTE_STREAM_SPLIT encoding + rows_per_rowgroup = 5000 + fixed_data = pa.array( + [flba(i) for i in range(num_rows)], type=pa.binary(32) + ) + i32_data = pa.array(list(range(num_rows)), type=pa.int32()) + i64_data = pa.array(list(range(num_rows)), type=pa.int64()) + f32_data = pa.array([float(i) for i in range(num_rows)], type=pa.float32()) + f64_data = pa.array([float(i) for i in range(num_rows)], type=pa.float64()) + padf = pa.Table.from_arrays( + [fixed_data, i32_data, i64_data, f32_data, f64_data], + names=["flba", "i32", "i64", "f32", "f64"], + ) + padf_fname = tmpdir.join("padf.parquet") + pq.write_table( + padf, + padf_fname, + column_encoding="BYTE_STREAM_SPLIT", + use_dictionary=False, + row_group_size=rows_per_rowgroup, + ) + + # round trip data with cudf + cdf = cudf.read_parquet(padf_fname) + cdf_fname = tmpdir.join("cdf.parquet") + cdf.to_parquet( + cdf_fname, + column_type_length={"flba": 32}, + column_encoding={ + "flba": "BYTE_STREAM_SPLIT", + "i32": "BYTE_STREAM_SPLIT", + "i64": "BYTE_STREAM_SPLIT", + "f32": "BYTE_STREAM_SPLIT", + "f64": "BYTE_STREAM_SPLIT", + }, + row_group_size_rows=rows_per_rowgroup, + ) + + # now read back in with pyarrow to test it was written properly by cudf + padf2 = pq.read_table(padf_fname) + padf3 = pq.read_table(cdf_fname) + assert_eq(padf2, padf3) + assert_eq(padf2.schema[0].type, padf3.schema[0].type) + + def test_parquet_reader_rle_boolean(datadir): fname = datadir / "rle_boolean_encoding.parquet" From c33e0a349b2d0c2a626364845e616cfd3d04afc6 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 24 Jun 2024 17:18:19 +0100 Subject: [PATCH 02/16] Add coverage for both expression and dataframe filter (#16002) Note that expression filter with literals does not work because broadcasting is not implemented. It is also the case that the result could be computed without broadcasting in the case of scalars with some data introspection, but we do not do that here. Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - Thomas Li (https://github.com/lithomas1) URL: https://github.com/rapidsai/cudf/pull/16002 --- .../tests/expressions/test_filter.py | 30 ++++++++++++++----- python/cudf_polars/tests/test_filter.py | 26 ++++++++++++++++ 2 files changed, 49 insertions(+), 7 deletions(-) create mode 100644 python/cudf_polars/tests/test_filter.py diff --git a/python/cudf_polars/tests/expressions/test_filter.py b/python/cudf_polars/tests/expressions/test_filter.py index 783403d764c..1a8e994e3aa 100644 --- a/python/cudf_polars/tests/expressions/test_filter.py +++ b/python/cudf_polars/tests/expressions/test_filter.py @@ -2,19 +2,35 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations +import pytest + import polars as pl from cudf_polars.testing.asserts import assert_gpu_result_equal -def test_filter(): - ldf = pl.DataFrame( +@pytest.mark.parametrize( + "expr", + [ + pytest.param( + pl.lit(value=False), + marks=pytest.mark.xfail(reason="Expression filter does not handle scalars"), + ), + pl.col("c"), + pl.col("b") > 2, + ], +) +@pytest.mark.parametrize("predicate_pushdown", [False, True]) +def test_filter_expression(expr, predicate_pushdown): + ldf = pl.LazyFrame( { "a": [1, 2, 3, 4, 5, 6, 7], - "b": [1, 1, 1, 1, 1, 1, 1], + "b": [0, 3, 1, 5, 6, 1, 0], + "c": [None, True, False, False, True, True, False], } - ).lazy() + ) - # group-by is just to avoid the filter being pushed into the scan. - query = ldf.group_by(pl.col("a")).agg(pl.col("b").sum()).filter(pl.col("b") < 1) - assert_gpu_result_equal(query) + query = ldf.select(pl.col("a").filter(expr)) + assert_gpu_result_equal( + query, collect_kwargs={"predicate_pushdown": predicate_pushdown} + ) diff --git a/python/cudf_polars/tests/test_filter.py b/python/cudf_polars/tests/test_filter.py new file mode 100644 index 00000000000..f39b348144b --- /dev/null +++ b/python/cudf_polars/tests/test_filter.py @@ -0,0 +1,26 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.mark.parametrize("expr", [pl.col("c"), pl.col("b") < 1, pl.lit(value=True)]) +@pytest.mark.parametrize("predicate_pushdown", [False, True]) +def test_filter(expr, predicate_pushdown): + ldf = pl.DataFrame( + { + "a": [1, 2, 3, 4, 5, 6, 7], + "b": [1, 1, 1, 1, 1, 1, 1], + "c": [True, False, False, True, True, True, None], + } + ).lazy() + + query = ldf.filter(expr) + assert_gpu_result_equal( + query, collect_kwargs={"predicate_pushdown": predicate_pushdown} + ) From f3183c11a71f90cd1096d95f6ded5ecf38b49a55 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 24 Jun 2024 17:24:24 +0100 Subject: [PATCH 03/16] Add full coverage for whole-frame Agg expressions (#15997) Also add more expansive comments on the unreachable paths. Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - Thomas Li (https://github.com/lithomas1) URL: https://github.com/rapidsai/cudf/pull/15997 --- python/cudf_polars/cudf_polars/dsl/expr.py | 58 ++++++++----------- .../cudf_polars/tests/expressions/test_agg.py | 14 +++++ 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index c92e0714d54..73f3c1ce289 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -952,7 +952,9 @@ def __init__( self.options = options self.children = (value,) if name not in Agg._SUPPORTED: - raise NotImplementedError(f"Unsupported aggregation {name=}") + raise NotImplementedError( + f"Unsupported aggregation {name=}" + ) # pragma: no cover; all valid aggs are supported # TODO: nan handling in groupby case if name == "min": req = plc.aggregation.min() @@ -978,7 +980,9 @@ def __init__( elif name == "count": req = plc.aggregation.count(null_handling=plc.types.NullPolicy.EXCLUDE) else: - raise NotImplementedError + raise NotImplementedError( + f"Unreachable, {name=} is incorrectly listed in _SUPPORTED" + ) # pragma: no cover self.request = req op = getattr(self, f"_{name}", None) if op is None: @@ -988,7 +992,9 @@ def __init__( elif name in {"count", "first", "last"}: pass else: - raise AssertionError + raise NotImplementedError( + f"Unreachable, supported agg {name=} has no implementation" + ) # pragma: no cover self.op = op _SUPPORTED: ClassVar[frozenset[str]] = frozenset( @@ -1010,11 +1016,15 @@ def __init__( def collect_agg(self, *, depth: int) -> AggInfo: """Collect information about aggregations in groupbys.""" if depth >= 1: - raise NotImplementedError("Nested aggregations in groupby") + raise NotImplementedError( + "Nested aggregations in groupby" + ) # pragma: no cover; check_agg trips first (child,) = self.children ((expr, _, _),) = child.collect_agg(depth=depth + 1).requests if self.request is None: - raise NotImplementedError(f"Aggregation {self.name} in groupby") + raise NotImplementedError( + f"Aggregation {self.name} in groupby" + ) # pragma: no cover; __init__ trips first return AggInfo([(expr, self.request, self)]) def _reduce( @@ -1024,10 +1034,7 @@ def _reduce( plc.Column.from_scalar( plc.reduce.reduce(column.obj, request, self.dtype), 1, - ), - is_sorted=plc.types.Sorted.YES, - order=plc.types.Order.ASCENDING, - null_order=plc.types.NullOrder.BEFORE, + ) ) def _count(self, column: Column) -> Column: @@ -1040,10 +1047,7 @@ def _count(self, column: Column) -> Column: ), ), 1, - ), - is_sorted=plc.types.Sorted.YES, - order=plc.types.Order.ASCENDING, - null_order=plc.types.NullOrder.BEFORE, + ) ) def _min(self, column: Column, *, propagate_nans: bool) -> Column: @@ -1054,10 +1058,7 @@ def _min(self, column: Column, *, propagate_nans: bool) -> Column: pa.scalar(float("nan"), type=plc.interop.to_arrow(self.dtype)) ), 1, - ), - is_sorted=plc.types.Sorted.YES, - order=plc.types.Order.ASCENDING, - null_order=plc.types.NullOrder.BEFORE, + ) ) if column.nan_count > 0: column = column.mask_nans() @@ -1071,31 +1072,18 @@ def _max(self, column: Column, *, propagate_nans: bool) -> Column: pa.scalar(float("nan"), type=plc.interop.to_arrow(self.dtype)) ), 1, - ), - is_sorted=plc.types.Sorted.YES, - order=plc.types.Order.ASCENDING, - null_order=plc.types.NullOrder.BEFORE, + ) ) if column.nan_count > 0: column = column.mask_nans() return self._reduce(column, request=plc.aggregation.max()) def _first(self, column: Column) -> Column: - return Column( - plc.copying.slice(column.obj, [0, 1])[0], - is_sorted=plc.types.Sorted.YES, - order=plc.types.Order.ASCENDING, - null_order=plc.types.NullOrder.BEFORE, - ) + return Column(plc.copying.slice(column.obj, [0, 1])[0]) def _last(self, column: Column) -> Column: n = column.obj.size() - return Column( - plc.copying.slice(column.obj, [n - 1, n])[0], - is_sorted=plc.types.Sorted.YES, - order=plc.types.Order.ASCENDING, - null_order=plc.types.NullOrder.BEFORE, - ) + return Column(plc.copying.slice(column.obj, [n - 1, n])[0]) def do_evaluate( self, @@ -1106,7 +1094,9 @@ def do_evaluate( ) -> Column: """Evaluate this expression given a dataframe for context.""" if context is not ExecutionContext.FRAME: - raise NotImplementedError(f"Agg in context {context}") + raise NotImplementedError( + f"Agg in context {context}" + ) # pragma: no cover; unreachable (child,) = self.children return self.op(child.evaluate(df, context=context, mapping=mapping)) diff --git a/python/cudf_polars/tests/expressions/test_agg.py b/python/cudf_polars/tests/expressions/test_agg.py index b044bbb2885..2ffa1c4af6d 100644 --- a/python/cudf_polars/tests/expressions/test_agg.py +++ b/python/cudf_polars/tests/expressions/test_agg.py @@ -56,3 +56,17 @@ def test_agg(df, agg): with pytest.raises(AssertionError): assert_gpu_result_equal(q) assert_gpu_result_equal(q, check_dtypes=check_dtypes, check_exact=False) + + +@pytest.mark.parametrize( + "propagate_nans", + [pytest.param(False, marks=pytest.mark.xfail(reason="Need to mask nans")), True], + ids=["mask_nans", "propagate_nans"], +) +@pytest.mark.parametrize("op", ["min", "max"]) +def test_agg_float_with_nans(propagate_nans, op): + df = pl.LazyFrame({"a": [1, 2, float("nan")]}) + op = getattr(pl.Expr, f"nan_{op}" if propagate_nans else op) + q = df.select(op(pl.col("a"))) + + assert_gpu_result_equal(q) From 0c6b828118fa371e3fd333718bc872085373a076 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 24 Jun 2024 07:05:37 -1000 Subject: [PATCH 04/16] Restrict the allowed pandas timezone objects in cudf (#16013) Since cudf's timezone support is based on the OS's tz data and hence `zoneinfo`, cudf cannot naturally support the variety of timezone objects supported by pandas (`pytz`, `dateutil`, etc). Therefore: * In pandas compatible mode, only accept pandas objects with zoneinfo timezones. * Otherwise, try to convert the pandas timezone to an equivalent zoneinfo object e.g. `pytz.timezone("US/Pacific")`-> `zoneinfo.ZoneInfo("US/Pacific")` Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/16013 --- python/cudf/cudf/core/_internals/timezones.py | 33 ++++++++++++++- python/cudf/cudf/core/column/column.py | 16 ++++++++ python/cudf/cudf/core/column/datetime.py | 33 +++++++-------- .../tests/indexes/datetime/test_indexing.py | 12 +++--- .../indexes/datetime/test_time_specific.py | 13 +++--- .../cudf/tests/series/test_datetimelike.py | 40 ++++++++++++++++--- 6 files changed, 108 insertions(+), 39 deletions(-) diff --git a/python/cudf/cudf/core/_internals/timezones.py b/python/cudf/cudf/core/_internals/timezones.py index 269fcf3e37f..29cb9d7bd12 100644 --- a/python/cudf/cudf/core/_internals/timezones.py +++ b/python/cudf/cudf/core/_internals/timezones.py @@ -1,21 +1,50 @@ # Copyright (c) 2023-2024, NVIDIA CORPORATION. from __future__ import annotations +import datetime import os import zoneinfo from functools import lru_cache from typing import TYPE_CHECKING, Literal import numpy as np +import pandas as pd +import cudf from cudf._lib.timezone import make_timezone_transition_table -from cudf.core.column.column import as_column if TYPE_CHECKING: from cudf.core.column.datetime import DatetimeColumn from cudf.core.column.timedelta import TimeDeltaColumn +def get_compatible_timezone(dtype: pd.DatetimeTZDtype) -> pd.DatetimeTZDtype: + """Convert dtype.tz object to zoneinfo object if possible.""" + tz = dtype.tz + if isinstance(tz, zoneinfo.ZoneInfo): + return dtype + if cudf.get_option("mode.pandas_compatible"): + raise NotImplementedError( + f"{tz} must be a zoneinfo.ZoneInfo object in pandas_compatible mode." + ) + elif (tzname := getattr(tz, "zone", None)) is not None: + # pytz-like + key = tzname + elif (tz_file := getattr(tz, "_filename", None)) is not None: + # dateutil-like + key = tz_file.split("zoneinfo/")[-1] + elif isinstance(tz, datetime.tzinfo): + # Try to get UTC-like tzinfos + reference = datetime.datetime.now() + key = tz.tzname(reference) + if not (isinstance(key, str) and key.lower() == "utc"): + raise NotImplementedError(f"cudf does not support {tz}") + else: + raise NotImplementedError(f"cudf does not support {tz}") + new_tz = zoneinfo.ZoneInfo(key) + return pd.DatetimeTZDtype(dtype.unit, new_tz) + + @lru_cache(maxsize=20) def get_tz_data(zone_name: str) -> tuple[DatetimeColumn, TimeDeltaColumn]: """ @@ -87,6 +116,8 @@ def _read_tzfile_as_columns( ) if not transition_times_and_offsets: + from cudf.core.column.column import as_column + # this happens for UTC-like zones min_date = np.int64(np.iinfo("int64").min + 1).astype("M8[s]") return (as_column([min_date]), as_column([np.timedelta64(0, "s")])) diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index c4e715aeb45..586689e2ee3 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -47,6 +47,7 @@ is_string_dtype, ) from cudf.core._compat import PANDAS_GE_210 +from cudf.core._internals.timezones import get_compatible_timezone from cudf.core.abc import Serializable from cudf.core.buffer import ( Buffer, @@ -1854,6 +1855,21 @@ def as_column( arbitrary.dtype, (pd.CategoricalDtype, pd.IntervalDtype, pd.DatetimeTZDtype), ): + if isinstance(arbitrary.dtype, pd.DatetimeTZDtype): + new_tz = get_compatible_timezone(arbitrary.dtype) + arbitrary = arbitrary.astype(new_tz) + if isinstance(arbitrary.dtype, pd.CategoricalDtype) and isinstance( + arbitrary.dtype.categories.dtype, pd.DatetimeTZDtype + ): + new_tz = get_compatible_timezone( + arbitrary.dtype.categories.dtype + ) + new_cats = arbitrary.dtype.categories.astype(new_tz) + new_dtype = pd.CategoricalDtype( + categories=new_cats, ordered=arbitrary.dtype.ordered + ) + arbitrary = arbitrary.astype(new_dtype) + return as_column( pa.array(arbitrary, from_pandas=True), nan_as_null=nan_as_null, diff --git a/python/cudf/cudf/core/column/datetime.py b/python/cudf/cudf/core/column/datetime.py index 9ac761b6be1..d88553361dd 100644 --- a/python/cudf/cudf/core/column/datetime.py +++ b/python/cudf/cudf/core/column/datetime.py @@ -21,6 +21,11 @@ from cudf._lib.search import search_sorted from cudf.api.types import is_datetime64_dtype, is_scalar, is_timedelta64_dtype from cudf.core._compat import PANDAS_GE_220 +from cudf.core._internals.timezones import ( + check_ambiguous_and_nonexistent, + get_compatible_timezone, + get_tz_data, +) from cudf.core.column import ColumnBase, as_column, column, string from cudf.core.column.timedelta import _unit_to_nanoseconds_conversion from cudf.utils.dtypes import _get_base_dtype @@ -282,8 +287,6 @@ def __contains__(self, item: ScalarLike) -> bool: @functools.cached_property def time_unit(self) -> str: - if isinstance(self.dtype, pd.DatetimeTZDtype): - return self.dtype.unit return np.datetime_data(self.dtype)[0] @property @@ -725,8 +728,6 @@ def _find_ambiguous_and_nonexistent( transitions occur in the time zone database for the given timezone. If no transitions occur, the tuple `(False, False)` is returned. """ - from cudf.core._internals.timezones import get_tz_data - transition_times, offsets = get_tz_data(zone_name) offsets = offsets.astype(f"timedelta64[{self.time_unit}]") # type: ignore[assignment] @@ -785,26 +786,22 @@ def tz_localize( ambiguous: Literal["NaT"] = "NaT", nonexistent: Literal["NaT"] = "NaT", ): - from cudf.core._internals.timezones import ( - check_ambiguous_and_nonexistent, - get_tz_data, - ) - if tz is None: return self.copy() ambiguous, nonexistent = check_ambiguous_and_nonexistent( ambiguous, nonexistent ) - dtype = pd.DatetimeTZDtype(self.time_unit, tz) + dtype = get_compatible_timezone(pd.DatetimeTZDtype(self.time_unit, tz)) + tzname = dtype.tz.key ambiguous_col, nonexistent_col = self._find_ambiguous_and_nonexistent( - tz + tzname ) localized = self._scatter_by_column( self.isnull() | (ambiguous_col | nonexistent_col), cudf.Scalar(cudf.NaT, dtype=self.dtype), ) - transition_times, offsets = get_tz_data(tz) + transition_times, offsets = get_tz_data(tzname) transition_times_local = (transition_times + offsets).astype( localized.dtype ) @@ -845,7 +842,7 @@ def __init__( offset=offset, null_count=null_count, ) - self._dtype = dtype + self._dtype = get_compatible_timezone(dtype) def to_pandas( self, @@ -865,6 +862,10 @@ def to_arrow(self): self._local_time.to_arrow(), str(self.dtype.tz) ) + @functools.cached_property + def time_unit(self) -> str: + return self.dtype.unit + @property def _utc_time(self): """Return UTC time as naive timestamps.""" @@ -880,8 +881,6 @@ def _utc_time(self): @property def _local_time(self): """Return the local time as naive timestamps.""" - from cudf.core._internals.timezones import get_tz_data - transition_times, offsets = get_tz_data(str(self.dtype.tz)) transition_times = transition_times.astype(_get_base_dtype(self.dtype)) indices = search_sorted([transition_times], [self], "right") - 1 @@ -911,10 +910,6 @@ def __repr__(self): ) def tz_localize(self, tz: str | None, ambiguous="NaT", nonexistent="NaT"): - from cudf.core._internals.timezones import ( - check_ambiguous_and_nonexistent, - ) - if tz is None: return self._local_time ambiguous, nonexistent = check_ambiguous_and_nonexistent( diff --git a/python/cudf/cudf/tests/indexes/datetime/test_indexing.py b/python/cudf/cudf/tests/indexes/datetime/test_indexing.py index f2c2d9a263b..ee4d0f7e816 100644 --- a/python/cudf/cudf/tests/indexes/datetime/test_indexing.py +++ b/python/cudf/cudf/tests/indexes/datetime/test_indexing.py @@ -1,4 +1,5 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +import zoneinfo import pandas as pd @@ -7,13 +8,10 @@ def test_slice_datetimetz_index(): + tz = zoneinfo.ZoneInfo("US/Eastern") data = ["2001-01-01", "2001-01-02", None, None, "2001-01-03"] - pidx = pd.DatetimeIndex(data, dtype="datetime64[ns]").tz_localize( - "US/Eastern" - ) - idx = cudf.DatetimeIndex(data, dtype="datetime64[ns]").tz_localize( - "US/Eastern" - ) + pidx = pd.DatetimeIndex(data, dtype="datetime64[ns]").tz_localize(tz) + idx = cudf.DatetimeIndex(data, dtype="datetime64[ns]").tz_localize(tz) expected = pidx[1:4] got = idx[1:4] assert_eq(expected, got) diff --git a/python/cudf/cudf/tests/indexes/datetime/test_time_specific.py b/python/cudf/cudf/tests/indexes/datetime/test_time_specific.py index b28ef131025..77b32b8ce89 100644 --- a/python/cudf/cudf/tests/indexes/datetime/test_time_specific.py +++ b/python/cudf/cudf/tests/indexes/datetime/test_time_specific.py @@ -1,4 +1,6 @@ # Copyright (c) 2022-2024, NVIDIA CORPORATION. +import zoneinfo + import pandas as pd import cudf @@ -6,24 +8,21 @@ def test_tz_localize(): + tz = zoneinfo.ZoneInfo("America/New_York") pidx = pd.date_range("2001-01-01", "2001-01-02", freq="1s") pidx = pidx.astype(" Date: Mon, 24 Jun 2024 18:25:10 +0100 Subject: [PATCH 05/16] Add tests of expression-based sort and sort-by (#16008) We only need stable vs unstable variants for the sort-by case, since when sorting a single column by itself there is no distinction between stable and unstable. Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/16008 --- .../tests/expressions/test_sort.py | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 python/cudf_polars/tests/expressions/test_sort.py diff --git a/python/cudf_polars/tests/expressions/test_sort.py b/python/cudf_polars/tests/expressions/test_sort.py new file mode 100644 index 00000000000..0195266f5c6 --- /dev/null +++ b/python/cudf_polars/tests/expressions/test_sort.py @@ -0,0 +1,53 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import itertools + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.mark.parametrize("descending", [False, True]) +@pytest.mark.parametrize("nulls_last", [False, True]) +def test_sort_expression(descending, nulls_last): + ldf = pl.LazyFrame( + { + "a": [5, -1, 3, 4, None, 8, 6, 7, None], + } + ) + + query = ldf.select(pl.col("a").sort(descending=descending, nulls_last=nulls_last)) + assert_gpu_result_equal(query) + + +@pytest.mark.parametrize( + "descending", itertools.combinations_with_replacement([False, True], 3) +) +@pytest.mark.parametrize( + "nulls_last", itertools.combinations_with_replacement([False, True], 3) +) +@pytest.mark.parametrize("maintain_order", [False, True], ids=["unstable", "stable"]) +def test_sort_by_expression(descending, nulls_last, maintain_order): + ldf = pl.LazyFrame( + { + "a": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "b": [1, 2, 2, 3, 9, 5, -1, 2, -2, 16], + "c": ["a", "A", "b", "b", "c", "d", "A", "Z", "ä", "̈Ä"], + } + ) + + query = ldf.select( + pl.col("a").sort_by( + pl.col("b"), + pl.col("c"), + pl.col("b") + pl.col("a"), + descending=descending, + nulls_last=nulls_last, + maintain_order=maintain_order, + ) + ) + assert_gpu_result_equal(query, check_row_order=maintain_order) From 4d4cdce2128398444a15f705d05ca062a6f0300f Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 24 Jun 2024 18:51:51 +0100 Subject: [PATCH 06/16] Add full coverage of utility functions (#15995) The datetime conversion tests just test that we can round-trip correctly for now. Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - Thomas Li (https://github.com/lithomas1) - Kyle Edwards (https://github.com/KyleFromNVIDIA) URL: https://github.com/rapidsai/cudf/pull/15995 --- .../cudf_polars/cudf_polars/utils/dtypes.py | 4 +-- .../cudf_polars/cudf_polars/utils/sorting.py | 4 +-- python/cudf_polars/pyproject.toml | 7 ++++ .../tests/expressions/test_datetime_basic.py | 34 +++++++++++++++++++ python/cudf_polars/tests/utils/test_dtypes.py | 31 +++++++++++++++++ .../cudf_polars/tests/utils/test_sorting.py | 21 ++++++++++++ 6 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 python/cudf_polars/tests/expressions/test_datetime_basic.py create mode 100644 python/cudf_polars/tests/utils/test_dtypes.py create mode 100644 python/cudf_polars/tests/utils/test_sorting.py diff --git a/python/cudf_polars/cudf_polars/utils/dtypes.py b/python/cudf_polars/cudf_polars/utils/dtypes.py index 7b0049daf11..3d4a643e1fc 100644 --- a/python/cudf_polars/cudf_polars/utils/dtypes.py +++ b/python/cudf_polars/cudf_polars/utils/dtypes.py @@ -70,7 +70,7 @@ def from_polars(dtype: pl.DataType) -> plc.DataType: return plc.DataType(plc.TypeId.TIMESTAMP_MICROSECONDS) elif dtype.time_unit == "ns": return plc.DataType(plc.TypeId.TIMESTAMP_NANOSECONDS) - assert dtype.time_unit is not None + assert dtype.time_unit is not None # pragma: no cover assert_never(dtype.time_unit) elif isinstance(dtype, pl.Duration): if dtype.time_unit == "ms": @@ -79,7 +79,7 @@ def from_polars(dtype: pl.DataType) -> plc.DataType: return plc.DataType(plc.TypeId.DURATION_MICROSECONDS) elif dtype.time_unit == "ns": return plc.DataType(plc.TypeId.DURATION_NANOSECONDS) - assert dtype.time_unit is not None + assert dtype.time_unit is not None # pragma: no cover assert_never(dtype.time_unit) elif isinstance(dtype, pl.String): return plc.DataType(plc.TypeId.STRING) diff --git a/python/cudf_polars/cudf_polars/utils/sorting.py b/python/cudf_polars/cudf_polars/utils/sorting.py index 24fd449dd88..57f94c4ec4c 100644 --- a/python/cudf_polars/cudf_polars/utils/sorting.py +++ b/python/cudf_polars/cudf_polars/utils/sorting.py @@ -43,8 +43,8 @@ def sort_order( for d in descending ] null_precedence = [] - # TODO: use strict=True when we drop py39 - assert len(descending) == len(nulls_last) + if len(descending) != len(nulls_last) or len(descending) != num_keys: + raise ValueError("Mismatching length of arguments in sort_order") for asc, null_last in zip(column_order, nulls_last): if (asc == plc.types.Order.ASCENDING) ^ (not null_last): null_precedence.append(plc.types.NullOrder.AFTER) diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index face04b9bd8..effa4861e0c 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -52,6 +52,13 @@ version = {file = "cudf_polars/VERSION"} [tool.pytest.ini_options] xfail_strict = true +[tool.coverage.report] +exclude_also = [ + "if TYPE_CHECKING:", + "class .*\\bProtocol\\):", + "assert_never\\(" +] + [tool.ruff] line-length = 88 indent-width = 4 diff --git a/python/cudf_polars/tests/expressions/test_datetime_basic.py b/python/cudf_polars/tests/expressions/test_datetime_basic.py new file mode 100644 index 00000000000..6ba2a1dce1e --- /dev/null +++ b/python/cudf_polars/tests/expressions/test_datetime_basic.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.mark.parametrize( + "dtype", + [ + pl.Date(), + pl.Datetime("ms"), + pl.Datetime("us"), + pl.Datetime("ns"), + pl.Duration("ms"), + pl.Duration("us"), + pl.Duration("ns"), + ], + ids=repr, +) +def test_datetime_dataframe_scan(dtype): + ldf = pl.DataFrame( + { + "a": pl.Series([1, 2, 3, 4, 5, 6, 7], dtype=dtype), + "b": pl.Series([3, 4, 5, 6, 7, 8, 9], dtype=pl.UInt16), + } + ).lazy() + + query = ldf.select(pl.col("b"), pl.col("a")) + assert_gpu_result_equal(query) diff --git a/python/cudf_polars/tests/utils/test_dtypes.py b/python/cudf_polars/tests/utils/test_dtypes.py new file mode 100644 index 00000000000..535fdd846a0 --- /dev/null +++ b/python/cudf_polars/tests/utils/test_dtypes.py @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.utils.dtypes import from_polars + + +@pytest.mark.parametrize( + "pltype", + [ + pl.Time(), + pl.Struct({"a": pl.Int8, "b": pl.Float32}), + pl.Datetime("ms", time_zone="US/Pacific"), + pl.Array(pl.Int8, 2), + pl.Binary(), + pl.Categorical(), + pl.Enum(["a", "b"]), + pl.Field("a", pl.Int8), + pl.Object(), + pl.Unknown(), + ], + ids=repr, +) +def test_unhandled_dtype_conversion_raises(pltype): + with pytest.raises(NotImplementedError): + _ = from_polars(pltype) diff --git a/python/cudf_polars/tests/utils/test_sorting.py b/python/cudf_polars/tests/utils/test_sorting.py new file mode 100644 index 00000000000..4e98a3a7ce7 --- /dev/null +++ b/python/cudf_polars/tests/utils/test_sorting.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +from cudf_polars.utils.sorting import sort_order + + +@pytest.mark.parametrize( + "descending,nulls_last,num_keys", + [ + ([True], [False, True], 3), + ([True, True], [False, True, False], 3), + ([False, True], [True], 3), + ], +) +def test_sort_order_raises_mismatch(descending, nulls_last, num_keys): + with pytest.raises(ValueError): + _ = sort_order(descending, nulls_last=nulls_last, num_keys=num_keys) From 9987410c4baa275c9ae46801112bc4b6d8d6b057 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 24 Jun 2024 11:16:56 -0700 Subject: [PATCH 07/16] Account for FIXED_LEN_BYTE_ARRAY when calculating fragment sizes in Parquet writer (#16064) The number of rows per fragment will be off by a factor of 4 for FIXED_LEN_BYTE_ARRAY columns. This results in many more fragments than are necessary to achieve user requested page size limits. This PR shifts where the determination of whether a column has fixed-width data to a location where knowledge of the schema can be used. Authors: - Ed Seidl (https://github.com/etseidl) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Muhammad Haseeb (https://github.com/mhaseeb123) URL: https://github.com/rapidsai/cudf/pull/16064 --- cpp/src/io/parquet/writer_impl.cu | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index ca15b532d07..bed4dbc5a66 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -296,19 +296,6 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream) CUDF_FAIL("Unexpected compound type"); } -// checks to see if the given column has a fixed size. This doesn't -// check every row, so assumes string and list columns are not fixed, even -// if each row is the same width. -// TODO: update this if FIXED_LEN_BYTE_ARRAY is ever supported for writes. -bool is_col_fixed_width(column_view const& column) -{ - if (column.type().id() == type_id::STRUCT) { - return std::all_of(column.child_begin(), column.child_end(), is_col_fixed_width); - } - - return is_fixed_width(column.type()); -} - /** * @brief Extends SchemaElement to add members required in constructing parquet_column_view * @@ -946,6 +933,15 @@ struct parquet_column_view { return schema_node.converted_type.value_or(UNKNOWN); } + // Checks to see if the given column has a fixed-width data type. This doesn't + // check every value, so it assumes string and list columns are not fixed-width, even + // if each value has the same size. + [[nodiscard]] bool is_fixed_width() const + { + // lists and strings are not fixed width + return max_rep_level() == 0 and physical_type() != Type::BYTE_ARRAY; + } + std::vector const& get_path_in_schema() { return path_in_schema; } // LIST related member functions @@ -1764,7 +1760,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, // unbalanced in final page sizes, so using 4 which seems to be a good // compromise at smoothing things out without getting fragment sizes too small. auto frag_size_fn = [&](auto const& col, size_t col_size) { - int const target_frags_per_page = is_col_fixed_width(col) ? 1 : 4; + int const target_frags_per_page = col.is_fixed_width() ? 1 : 4; auto const avg_len = target_frags_per_page * util::div_rounding_up_safe(col_size, input.num_rows()); if (avg_len > 0) { @@ -1775,8 +1771,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } }; - std::transform(single_streams_table.begin(), - single_streams_table.end(), + std::transform(parquet_columns.begin(), + parquet_columns.end(), column_sizes.begin(), column_frag_size.begin(), frag_size_fn); From f583879e2fb90c104dee259b676e836ed6e60ca0 Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Mon, 24 Jun 2024 13:40:08 -0500 Subject: [PATCH 08/16] More safely parse CUDA versions when subprocess output is contaminated (#16067) In some user environments, calling a subprocess may produce output that confuses the version parsing machinery inside `_ptxcompiler`. Since the affected functions are vendored from the real `ptxcompiler` package for the purposes of using them with CUDA 12, this fix will only these situations for CUDA 12+. Closes https://github.com/rapidsai/cudf/issues/16016. Authors: - https://github.com/brandon-b-miller Approvers: - Bradley Dice (https://github.com/bdice) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/16067 --- python/cudf/cudf/utils/_ptxcompiler.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/utils/_ptxcompiler.py b/python/cudf/cudf/utils/_ptxcompiler.py index 54f5ea08ee1..9d7071d55a5 100644 --- a/python/cudf/cudf/utils/_ptxcompiler.py +++ b/python/cudf/cudf/utils/_ptxcompiler.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,14 @@ import math import os +import re import subprocess import sys import warnings NO_DRIVER = (math.inf, math.inf) +START_TAG = "_VER_START" +END_TAG = "_VER_END" NUMBA_CHECK_VERSION_CMD = """\ from ctypes import c_int, byref @@ -28,7 +31,7 @@ drv_major = dv.value // 1000 drv_minor = (dv.value - (drv_major * 1000)) // 10 run_major, run_minor = cuda.runtime.get_version() -print(f'{drv_major} {drv_minor} {run_major} {run_minor}') +print(f'_VER_START{drv_major} {drv_minor} {run_major} {run_minor}_VER_END') """ @@ -61,7 +64,11 @@ def get_versions(): warnings.warn(msg, UserWarning) return NO_DRIVER - versions = [int(s) for s in cp.stdout.strip().split()] + pattern = r"_VER_START(.*?)_VER_END" + + ver_str = re.search(pattern, cp.stdout.decode()).group(1) + + versions = [int(s) for s in ver_str.strip().split()] driver_version = tuple(versions[:2]) runtime_version = tuple(versions[2:]) From bd76bf6b293b7f17a846df8392c18d92ced2b40f Mon Sep 17 00:00:00 2001 From: brandon-b-miller <53796099+brandon-b-miller@users.noreply.github.com> Date: Mon, 24 Jun 2024 13:43:33 -0500 Subject: [PATCH 09/16] cuDF/libcudf exponentially weighted moving averages (#9027) Adds an exponentially weighted moving average aggregation to `cudf::scan` and plumbs it up through `cudf.Series.ewm`, similar to `pandas.Series.ewm`. partially resolves https://github.com/rapidsai/cudf/issues/1263 Authors: - https://github.com/brandon-b-miller - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/9027 --- cpp/CMakeLists.txt | 1 + cpp/include/cudf/aggregation.hpp | 41 ++- .../cudf/detail/aggregation/aggregation.hpp | 44 +++ cpp/src/aggregation/aggregation.cpp | 22 ++ cpp/src/reductions/scan/ewm.cu | 330 ++++++++++++++++++ cpp/src/reductions/scan/scan.cuh | 7 + cpp/src/reductions/scan/scan_inclusive.cu | 3 +- cpp/tests/CMakeLists.txt | 1 + cpp/tests/reductions/ewm_tests.cpp | 101 ++++++ .../source/user_guide/api_docs/dataframe.rst | 1 + .../source/user_guide/api_docs/series.rst | 1 + python/cudf/cudf/_lib/aggregation.pyx | 8 + .../cudf/cudf/_lib/pylibcudf/aggregation.pxd | 3 + .../cudf/cudf/_lib/pylibcudf/aggregation.pyx | 26 ++ .../_lib/pylibcudf/libcudf/aggregation.pxd | 8 + python/cudf/cudf/core/indexed_frame.py | 28 +- python/cudf/cudf/core/window/__init__.py | 4 +- python/cudf/cudf/core/window/ewm.py | 200 +++++++++++ python/cudf/cudf/core/window/rolling.py | 22 +- python/cudf/cudf/pandas/_wrappers/pandas.py | 2 +- python/cudf/cudf/tests/test_ewm.py | 46 +++ 21 files changed, 892 insertions(+), 7 deletions(-) create mode 100644 cpp/src/reductions/scan/ewm.cu create mode 100644 cpp/tests/reductions/ewm_tests.cpp create mode 100644 python/cudf/cudf/core/window/ewm.py create mode 100644 python/cudf/cudf/tests/test_ewm.py diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index aab0a9b2d49..5fd68bfb26c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -502,6 +502,7 @@ add_library( src/reductions/product.cu src/reductions/reductions.cpp src/reductions/scan/rank_scan.cu + src/reductions/scan/ewm.cu src/reductions/scan/scan.cpp src/reductions/scan/scan_exclusive.cu src/reductions/scan/scan_inclusive.cu diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index d458c831f19..3c1023017be 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,6 +103,7 @@ class aggregation { NUNIQUE, ///< count number of unique elements NTH_ELEMENT, ///< get the nth element ROW_NUMBER, ///< get row-number of current index (relative to rolling window) + EWMA, ///< get exponential weighted moving average at current index RANK, ///< get rank of current index COLLECT_LIST, ///< collect values into a list COLLECT_SET, ///< collect values into a list without duplicate entries @@ -250,6 +251,8 @@ class segmented_reduce_aggregation : public virtual aggregation { enum class udf_type : bool { CUDA, PTX }; /// Type of correlation method. enum class correlation_type : int32_t { PEARSON, KENDALL, SPEARMAN }; +/// Type of treatment of EWM input values' first value +enum class ewm_history : int32_t { INFINITE, FINITE }; /// Factory to create a SUM aggregation /// @return A SUM aggregation object @@ -411,6 +414,42 @@ std::unique_ptr make_nth_element_aggregation( template std::unique_ptr make_row_number_aggregation(); +/** + * @brief Factory to create an EWMA aggregation + * + * `EWMA` returns a non-nullable column with the same type as the input, + * whose values are the exponentially weighted moving average of the input + * sequence. Let these values be known as the y_i. + * + * EWMA aggregations are parameterized by a center of mass (`com`) which + * affects the contribution of the previous values (y_0 ... y_{i-1}) in + * computing the y_i. + * + * EWMA aggregations are also parameterized by a history `cudf::ewm_history`. + * Special considerations have to be given to the mathematical treatment of + * the first value of the input sequence. There are two approaches to this, + * one which considers the first value of the sequence to be the exponential + * weighted moving average of some infinite history of data, and one which + * takes the first value to be the only datapoint known. These assumptions + * lead to two different formulas for the y_i. `ewm_history` selects which. + * + * EWMA aggregations have special null handling. Nulls have two effects. The + * first is to propagate forward the last valid value as far as it has been + * computed. This could be thought of as the nulls not affecting the average + * in any way. The second effect changes the way the y_i are computed. Since + * a moving average is conceptually designed to weight contributing values by + * their recency, nulls ought to count as valid periods even though they do + * not change the average. For example, if the input sequence is {1, NULL, 3} + * then when computing y_2 one should weigh y_0 as if it occurs two periods + * before y_2 rather than just one. + * + * @param center_of_mass the center of mass. + * @param history which assumption to make about the first value + * @return A EWM aggregation object + */ +template +std::unique_ptr make_ewma_aggregation(double const center_of_mass, ewm_history history); + /** * @brief Factory to create a RANK aggregation * diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index edee83783b8..843414817e3 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -76,6 +76,8 @@ class simple_aggregations_collector { // Declares the interface for the simple class nth_element_aggregation const& agg); virtual std::vector> visit(data_type col_type, class row_number_aggregation const& agg); + virtual std::vector> visit(data_type col_type, + class ewma_aggregation const& agg); virtual std::vector> visit(data_type col_type, class rank_aggregation const& agg); virtual std::vector> visit( @@ -141,6 +143,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer virtual void visit(class correlation_aggregation const& agg); virtual void visit(class tdigest_aggregation const& agg); virtual void visit(class merge_tdigest_aggregation const& agg); + virtual void visit(class ewma_aggregation const& agg); }; /** @@ -667,6 +670,40 @@ class row_number_aggregation final : public rolling_aggregation { void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } }; +/** + * @brief Derived class for specifying an ewma aggregation + */ +class ewma_aggregation final : public scan_aggregation { + public: + double const center_of_mass; + cudf::ewm_history history; + + ewma_aggregation(double const center_of_mass, cudf::ewm_history history) + : aggregation{EWMA}, center_of_mass{center_of_mass}, history{history} + { + } + + std::unique_ptr clone() const override + { + return std::make_unique(*this); + } + + std::vector> get_simple_aggregations( + data_type col_type, simple_aggregations_collector& collector) const override + { + return collector.visit(col_type, *this); + } + + bool is_equal(aggregation const& _other) const override + { + if (!this->aggregation::is_equal(_other)) { return false; } + auto const& other = dynamic_cast(_other); + return this->center_of_mass == other.center_of_mass and this->history == other.history; + } + + void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } +}; + /** * @brief Derived class for specifying a rank aggregation */ @@ -1336,6 +1373,11 @@ struct target_type_impl { using type = size_type; }; +template +struct target_type_impl { + using type = double; +}; + // Always use size_type accumulator for RANK template struct target_type_impl { @@ -1536,6 +1578,8 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind return f.template operator()(std::forward(args)...); case aggregation::MERGE_TDIGEST: return f.template operator()(std::forward(args)...); + case aggregation::EWMA: + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Unsupported aggregation."); diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index adee9147740..5422304c5cb 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -154,6 +154,12 @@ std::vector> simple_aggregations_collector::visit( return visit(col_type, static_cast(agg)); } +std::vector> simple_aggregations_collector::visit( + data_type col_type, ewma_aggregation const& agg) +{ + return visit(col_type, static_cast(agg)); +} + std::vector> simple_aggregations_collector::visit( data_type col_type, rank_aggregation const& agg) { @@ -333,6 +339,11 @@ void aggregation_finalizer::visit(row_number_aggregation const& agg) visit(static_cast(agg)); } +void aggregation_finalizer::visit(ewma_aggregation const& agg) +{ + visit(static_cast(agg)); +} + void aggregation_finalizer::visit(rank_aggregation const& agg) { visit(static_cast(agg)); @@ -665,6 +676,17 @@ std::unique_ptr make_row_number_aggregation() template std::unique_ptr make_row_number_aggregation(); template std::unique_ptr make_row_number_aggregation(); +/// Factory to create an EWMA aggregation +template +std::unique_ptr make_ewma_aggregation(double const com, cudf::ewm_history history) +{ + return std::make_unique(com, history); +} +template std::unique_ptr make_ewma_aggregation(double const com, + cudf::ewm_history history); +template std::unique_ptr make_ewma_aggregation( + double const com, cudf::ewm_history history); + /// Factory to create a RANK aggregation template std::unique_ptr make_rank_aggregation(rank_method method, diff --git a/cpp/src/reductions/scan/ewm.cu b/cpp/src/reductions/scan/ewm.cu new file mode 100644 index 00000000000..3fa2de450ad --- /dev/null +++ b/cpp/src/reductions/scan/ewm.cu @@ -0,0 +1,330 @@ +/* + * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "scan.cuh" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace cudf { +namespace detail { + +template +using pair_type = thrust::pair; + +/** + * @brief functor to be summed over in a prefix sum such that + * the recurrence in question is solved. See + * G. E. Blelloch. Prefix sums and their applications. Technical Report + * CMU-CS-90-190, Nov. 1990. S. 1.4 + * for details + */ +template +class recurrence_functor { + public: + __device__ pair_type operator()(pair_type ci, pair_type cj) + { + return {ci.first * cj.first, ci.second * cj.first + cj.second}; + } +}; + +template +struct ewma_functor_base { + T beta; + const pair_type IDENTITY{1.0, 0.0}; +}; + +template +struct ewma_adjust_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(thrust::tuple const data) + { + // Not const to allow for updating the input value + auto [valid, exp, input] = data; + if (!valid) { return this->IDENTITY; } + if constexpr (not is_numerator) { input = 1; } + + // The value is non-null, but nulls preceding it + // must adjust the second element of the pair + T const beta = this->beta; + return {beta * ((exp != 0) ? pow(beta, exp) : 1), input}; + } +}; + +template +struct ewma_adjust_no_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(T const data) + { + T const beta = this->beta; + if constexpr (is_numerator) { + return {beta, data}; + } else { + return {beta, 1.0}; + } + } +}; + +template +struct ewma_noadjust_nulls_functor : public ewma_functor_base { + /* + In the null case, a denominator actually has to be computed. The formula is + y_{i+1} = (1 - alpha)x_{i-1} + alpha x_i, but really there is a "denominator" + which is the sum of the weights: alpha + (1 - alpha) == 1. If a null is + encountered, that means that the "previous" value is downweighted by a + factor (for each missing value). For example with a single null: + data = {x_0, NULL, x_1}, + y_2 = (1 - alpha)**2 x_0 + alpha * x_2 / (alpha + (1-alpha)**2) + + As such, the pairs must be updated before summing like the adjusted case to + properly downweight the previous values. But now but we also need to compute + the normalization factors and divide the results into them at the end. + */ + __device__ pair_type operator()(thrust::tuple const data) + { + T const beta = this->beta; + auto const [input, index, valid, nullcnt] = data; + if (index == 0) { + return {beta, input}; + } else { + if (!valid) { return this->IDENTITY; } + // preceding value is valid, return normal pair + if (nullcnt == 0) { return {beta, (1.0 - beta) * input}; } + // one or more preceding values is null, adjust by how many + T const factor = (1.0 - beta) + pow(beta, nullcnt + 1); + return {(beta * (pow(beta, nullcnt)) / factor), ((1.0 - beta) * input) / factor}; + } + } +}; + +template +struct ewma_noadjust_no_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(thrust::tuple const data) + { + T const beta = this->beta; + auto const [input, index] = data; + if (index == 0) { + return {beta, input}; + } else { + return {beta, (1.0 - beta) * input}; + } + } +}; + +/** +* @brief Return an array whose values y_i are the number of null entries +* in between the last valid entry of the input and the current index. +* Example: {1, NULL, 3, 4, NULL, NULL, 7} + -> {0, 0 1, 0, 0, 1, 2} +*/ +rmm::device_uvector null_roll_up(column_view const& input, + rmm::cuda_stream_view stream) +{ + rmm::device_uvector output(input.size(), stream); + + auto device_view = column_device_view::create(input); + auto invalid_it = thrust::make_transform_iterator( + cudf::detail::make_validity_iterator(*device_view), + cuda::proclaim_return_type([] __device__(int valid) -> int { return 1 - valid; })); + + // valid mask {1, 0, 1, 0, 0, 1} leads to output array {0, 0, 1, 0, 1, 2} + thrust::inclusive_scan_by_key(rmm::exec_policy(stream), + invalid_it, + invalid_it + input.size() - 1, + invalid_it, + std::next(output.begin())); + return output; +} + +template +rmm::device_uvector compute_ewma_adjust(column_view const& input, + T const beta, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + rmm::device_uvector output(input.size(), stream); + rmm::device_uvector> pairs(input.size(), stream); + + if (input.has_nulls()) { + rmm::device_uvector nullcnt = null_roll_up(input, stream); + auto device_view = column_device_view::create(input); + auto valid_it = cudf::detail::make_validity_iterator(*device_view); + auto data = + thrust::make_zip_iterator(thrust::make_tuple(valid_it, nullcnt.begin(), input.begin())); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_adjust_nulls_functor{beta}, + recurrence_functor{}); + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_adjust_nulls_functor{beta}, + recurrence_functor{}); + + } else { + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + input.begin(), + input.end(), + pairs.begin(), + ewma_adjust_no_nulls_functor{beta}, + recurrence_functor{}); + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + auto itr = thrust::make_counting_iterator(0); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + itr, + itr + input.size(), + pairs.begin(), + ewma_adjust_no_nulls_functor{beta}, + recurrence_functor{}); + } + + thrust::transform( + rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + output.begin(), + [] __device__(pair_type pair, T numerator) -> T { return numerator / pair.second; }); + + return output; +} + +template +rmm::device_uvector compute_ewma_noadjust(column_view const& input, + T const beta, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + rmm::device_uvector output(input.size(), stream); + rmm::device_uvector> pairs(input.size(), stream); + rmm::device_uvector nullcnt = + [&input, stream]() -> rmm::device_uvector { + if (input.has_nulls()) { + return null_roll_up(input, stream); + } else { + return rmm::device_uvector(input.size(), stream); + } + }(); + // denominators are all 1 and do not need to be computed + // pairs are all (beta, 1-beta x_i) except for the first one + + if (!input.has_nulls()) { + auto data = thrust::make_zip_iterator( + thrust::make_tuple(input.begin(), thrust::make_counting_iterator(0))); + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_noadjust_no_nulls_functor{beta}, + recurrence_functor{}); + + } else { + auto device_view = column_device_view::create(input); + auto valid_it = detail::make_validity_iterator(*device_view); + + auto data = thrust::make_zip_iterator(thrust::make_tuple( + input.begin(), thrust::make_counting_iterator(0), valid_it, nullcnt.begin())); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_noadjust_nulls_functor{beta}, + recurrence_functor()); + } + + // copy the second elements to the output for now + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + return output; +} + +struct ewma_functor { + template ::value)> + std::unique_ptr operator()(scan_aggregation const& agg, + column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + { + CUDF_FAIL("Unsupported type for EWMA."); + } + + template ::value)> + std::unique_ptr operator()(scan_aggregation const& agg, + column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + { + auto const ewma_agg = dynamic_cast(&agg); + auto const history = ewma_agg->history; + auto const center_of_mass = ewma_agg->center_of_mass; + + // center of mass is easier for the user, but the recurrences are + // better expressed in terms of the derived parameter `beta` + T const beta = center_of_mass / (center_of_mass + 1.0); + + auto result = [&]() { + if (history == cudf::ewm_history::INFINITE) { + return compute_ewma_adjust(input, beta, stream, mr); + } else { + return compute_ewma_noadjust(input, beta, stream, mr); + } + }(); + return std::make_unique(cudf::data_type(cudf::type_to_id()), + input.size(), + result.release(), + rmm::device_buffer{}, + 0); + } +}; + +std::unique_ptr exponentially_weighted_moving_average(column_view const& input, + scan_aggregation const& agg, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return type_dispatcher(input.type(), ewma_functor{}, agg, input, stream, mr); +} + +} // namespace detail +} // namespace cudf diff --git a/cpp/src/reductions/scan/scan.cuh b/cpp/src/reductions/scan/scan.cuh index aeb9e516cd4..6c237741ac3 100644 --- a/cpp/src/reductions/scan/scan.cuh +++ b/cpp/src/reductions/scan/scan.cuh @@ -36,6 +36,12 @@ std::pair mask_scan(column_view const& input_view rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +// exponentially weighted moving average of the input +std::unique_ptr exponentially_weighted_moving_average(column_view const& input, + scan_aggregation const& agg, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + template