From 9b532abfcec19d9b139994a94e3dd974c7501a34 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 8 Sep 2021 12:24:43 -0700 Subject: [PATCH] Support `VARIANCE` and `STD` aggregation in rolling op (#8809) Part 1 of #8695 This PR adds support to `STD` and `VARIANCE` rolling aggregations in libcudf. - Supported types include numeric types and fixed point types. Chrono types are not supported - see thread in issue. Implementation notes: - [Welford](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm)'s algorithm is used Authors: - Michael Wang (https://github.com/isVoid) Approvers: - MithunR (https://github.com/mythrocks) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/8809 --- cpp/include/cudf/aggregation.hpp | 5 + .../cudf/detail/aggregation/aggregation.hpp | 4 +- cpp/include/cudf/rolling.hpp | 9 +- cpp/include/cudf_test/type_lists.hpp | 14 +- cpp/src/aggregation/aggregation.cpp | 4 + cpp/src/rolling/rolling.cu | 32 +-- cpp/src/rolling/rolling_detail.cuh | 131 ++++++++++-- cpp/tests/rolling/rolling_test.cpp | 192 +++++++++++++++++- 8 files changed, 355 insertions(+), 36 deletions(-) diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index ff665e2706a..c302895880d 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -118,6 +118,7 @@ class rolling_aggregation : public virtual aggregation { protected: rolling_aggregation() {} + rolling_aggregation(aggregation::Kind a) : aggregation{a} {} }; /** @@ -203,6 +204,8 @@ std::unique_ptr make_m2_aggregation(); * * @param ddof Delta degrees of freedom. The divisor used in calculation of * `variance` is `N - ddof`, where `N` is the population size. + * + * @throw cudf::logic_error if input type is chrono or compound types. */ template std::unique_ptr make_variance_aggregation(size_type ddof = 1); @@ -212,6 +215,8 @@ std::unique_ptr make_variance_aggregation(size_type ddof = 1); * * @param ddof Delta degrees of freedom. The divisor used in calculation of * `std` is `N - ddof`, where `N` is the population size. + * + * @throw cudf::logic_error if input type is chrono or compound types. */ template std::unique_ptr make_std_aggregation(size_type ddof = 1); diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index 4e4c63ae517..4cf902ef562 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -328,7 +328,7 @@ class m2_aggregation : public groupby_aggregation { /** * @brief Derived class for specifying a standard deviation/variance aggregation */ -class std_var_aggregation : public groupby_aggregation { +class std_var_aggregation : public rolling_aggregation, public groupby_aggregation { public: size_type _ddof; ///< Delta degrees of freedom @@ -342,7 +342,7 @@ class std_var_aggregation : public groupby_aggregation { size_t do_hash() const override { return this->aggregation::do_hash() ^ hash_impl(); } protected: - std_var_aggregation(aggregation::Kind k, size_type ddof) : aggregation(k), _ddof{ddof} + std_var_aggregation(aggregation::Kind k, size_type ddof) : rolling_aggregation(k), _ddof{ddof} { CUDF_EXPECTS(k == aggregation::STD or k == aggregation::VARIANCE, "std_var_aggregation can accept only STD, VARIANCE"); diff --git a/cpp/include/cudf/rolling.hpp b/cpp/include/cudf/rolling.hpp index 4fb1b4a7319..21df994c644 100644 --- a/cpp/include/cudf/rolling.hpp +++ b/cpp/include/cudf/rolling.hpp @@ -41,9 +41,12 @@ namespace cudf { * - instead of storing NA/NaN for output rows that do not meet the minimum number of observations * this function updates the valid bitmask of the column to indicate which elements are valid. * - * The returned column for count aggregation always has `INT32` type. All other operators return a - * column of the same type as the input. Therefore it is suggested to convert integer column types - * (especially low-precision integers) to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`. + * Notes on return column types: + * - The returned column for count aggregation always has `INT32` type. + * - The returned column for VARIANCE/STD aggregations always has `FLOAT64` type. + * - All other operators return a column of the same type as the input. Therefore + * it is suggested to convert integer column types (especially low-precision integers) + * to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`. * * @param[in] input_col The input column * @param[in] preceding_window The static rolling window size in the backward direction. diff --git a/cpp/include/cudf_test/type_lists.hpp b/cpp/include/cudf_test/type_lists.hpp index 5c1b0c6c458..74688b7f133 100644 --- a/cpp/include/cudf_test/type_lists.hpp +++ b/cpp/include/cudf_test/type_lists.hpp @@ -287,11 +287,23 @@ using FixedWidthTypes = Concat; * Example: * ``` * // Invokes all typed fixture tests for all fixed-width types in libcudf - * TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypes); + * TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypesWithoutFixedPoint); * ``` */ using FixedWidthTypesWithoutFixedPoint = Concat; +/** + * @brief Provides a list of all fixed-width element types except for the + * chrono types for use in GTest typed tests. + * + * Example: + * ``` + * // Invokes all typed fixture tests for all fixed-width types in libcudf + * TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypesWithoutChrono); + * ``` + */ +using FixedWidthTypesWithoutChrono = Concat; + /** * @brief Provides a list of sortable types for use in GTest typed tests. * diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index f0c522257fb..c3d992e1181 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -465,6 +465,8 @@ std::unique_ptr make_variance_aggregation(size_type ddof) return std::make_unique(ddof); } template std::unique_ptr make_variance_aggregation(size_type ddof); +template std::unique_ptr make_variance_aggregation( + size_type ddof); template std::unique_ptr make_variance_aggregation( size_type ddof); @@ -475,6 +477,8 @@ std::unique_ptr make_std_aggregation(size_type ddof) return std::make_unique(ddof); } template std::unique_ptr make_std_aggregation(size_type ddof); +template std::unique_ptr make_std_aggregation( + size_type ddof); template std::unique_ptr make_std_aggregation( size_type ddof); diff --git a/cpp/src/rolling/rolling.cu b/cpp/src/rolling/rolling.cu index eb81f81ef12..ab0f78bcf5d 100644 --- a/cpp/src/rolling/rolling.cu +++ b/cpp/src/rolling/rolling.cu @@ -18,21 +18,6 @@ #include "rolling_detail.cuh" namespace cudf { - -// Applies a fixed-size rolling window function to the values in a column. -std::unique_ptr rolling_window(column_view const& input, - size_type preceding_window, - size_type following_window, - size_type min_periods, - rolling_aggregation const& agg, - rmm::mr::device_memory_resource* mr) -{ - auto defaults = - cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input; - return rolling_window( - input, empty_like(defaults)->view(), preceding_window, following_window, min_periods, agg, mr); -} - namespace detail { // Applies a fixed-size rolling window function to the values in a column. @@ -127,7 +112,8 @@ std::unique_ptr rolling_window(column_view const& input, } // namespace detail -// Applies a fixed-size rolling window function to the values in a column. +// Applies a fixed-size rolling window function to the values in a column, with default output +// specified std::unique_ptr rolling_window(column_view const& input, column_view const& default_outputs, size_type preceding_window, @@ -146,6 +132,20 @@ std::unique_ptr rolling_window(column_view const& input, mr); } +// Applies a fixed-size rolling window function to the values in a column, without default specified +std::unique_ptr rolling_window(column_view const& input, + size_type preceding_window, + size_type following_window, + size_type min_periods, + rolling_aggregation const& agg, + rmm::mr::device_memory_resource* mr) +{ + auto defaults = + cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input; + return rolling_window( + input, empty_like(defaults)->view(), preceding_window, following_window, min_periods, agg, mr); +} + // Applies a variable-size rolling window function to the values in a column. std::unique_ptr rolling_window(column_view const& input, column_view const& preceding_window, diff --git a/cpp/src/rolling/rolling_detail.cuh b/cpp/src/rolling/rolling_detail.cuh index 64395a3522c..4cddfe46a66 100644 --- a/cpp/src/rolling/rolling_detail.cuh +++ b/cpp/src/rolling/rolling_detail.cuh @@ -26,21 +26,17 @@ #include #include #include -#include #include #include #include #include #include -#include +#include #include #include -#include #include #include #include -#include -#include #include #include #include @@ -54,13 +50,12 @@ #include #include +#include -#include -#include -#include #include #include -#include + +#include #include @@ -279,6 +274,86 @@ struct DeviceRollingCountAll { } }; +/** + * @brief Operator for applying a VAR rolling aggregation on a single window. + */ +template +struct DeviceRollingVariance { + size_type const min_periods; + size_type const ddof; + + // what operations do we support + template + static constexpr bool is_supported() + { + return is_fixed_width() and not is_chrono(); + } + + DeviceRollingVariance(size_type _min_periods, size_type _ddof) + : min_periods(_min_periods), ddof{_ddof} + { + } + + template + bool __device__ operator()(column_device_view const& input, + column_device_view const&, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) const + { + using DeviceInputType = device_storage_type_t; + + // valid counts in the window + cudf::size_type const count = + has_nulls ? thrust::count_if(thrust::seq, + thrust::make_counting_iterator(start_index), + thrust::make_counting_iterator(end_index), + [&input](auto i) { return input.is_valid_nocheck(i); }) + : end_index - start_index; + + // Result will be null if any of the following conditions are met: + // - All inputs are null + // - Number of valid inputs is less than `min_periods` + bool output_is_valid = count > 0 and (count >= min_periods); + + if (output_is_valid) { + if (count >= ddof) { + // Welford algorithm + // See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + OutputType m{0}, m2{0}; + size_type running_count{0}; + + for (size_type i = start_index; i < end_index; i++) { + if (has_nulls and input.is_null_nocheck(i)) { continue; } + + OutputType const x = static_cast(input.element(i)); + + running_count++; + OutputType const tmp1 = x - m; + m += tmp1 / running_count; + OutputType const tmp2 = x - m; + m2 += tmp1 * tmp2; + } + if constexpr (is_fixed_point()) { + // For fixed_point types, the previous computed value used unscaled rep-value, + // the final result should be multiplied by the square of decimal `scale`. + OutputType scaleby = exp10(static_cast(input.type().scale())); + scaleby *= scaleby; + output.element(current_index) = m2 / (count - ddof) * scaleby; + } else { + output.element(current_index) = m2 / (count - ddof); + } + } else { + output.element(current_index) = + cuda::std::numeric_limits::signaling_NaN(); + } + } + + return output_is_valid; + } +}; + /** * @brief Operator for applying a ROW_NUMBER rolling aggregation on a single window. */ @@ -506,6 +581,11 @@ struct corresponding_rolling_operator { using type = DeviceRollingLead; }; +template +struct corresponding_rolling_operator { + using type = DeviceRollingVariance; +}; + template struct corresponding_rolling_operator { using type = DeviceRollingLag; @@ -527,15 +607,24 @@ struct create_rolling_operator< InputType, op, std::enable_if_t::type::is_supported()>> { - template < - typename T = InputType, - aggregation::Kind O = op, - std::enable_if_t* = nullptr> + template * = nullptr> auto operator()(size_type min_periods, rolling_aggregation const&) { return typename corresponding_rolling_operator::type(min_periods); } + template * = nullptr> + auto operator()(size_type min_periods, rolling_aggregation const& agg) + { + return DeviceRollingVariance{ + min_periods, dynamic_cast(agg)._ddof}; + } + template * = nullptr> @@ -632,6 +721,16 @@ class rolling_aggregation_preprocessor final : public cudf::detail::simple_aggre return {}; } + // STD aggregations depends on VARIANCE aggregation. Each element is applied + // with sqaured-root in the finalize() step. + std::vector> visit(data_type, + cudf::detail::std_aggregation const& agg) override + { + std::vector> aggs; + aggs.push_back(make_variance_aggregation(agg._ddof)); + return aggs; + } + // LEAD and LAG have custom behaviors for non fixed-width types. std::vector> visit( data_type col_type, cudf::detail::lead_lag_aggregation const& agg) override @@ -750,6 +849,12 @@ class rolling_aggregation_postprocessor final : public cudf::detail::aggregation lists_column_view(collected_list->view()), agg._nulls_equal, agg._nans_equal, stream, mr); } + // perform the element-wise square root operation on result of VARIANCE + void visit(cudf::detail::std_aggregation const& agg) override + { + result = detail::unary_operation(intermediate->view(), unary_operator::SQRT, stream, mr); + } + std::unique_ptr get_result() { CUDF_EXPECTS(result != nullptr, diff --git a/cpp/tests/rolling/rolling_test.cpp b/cpp/tests/rolling/rolling_test.cpp index ec88500fde1..a83e5886df5 100644 --- a/cpp/tests/rolling/rolling_test.cpp +++ b/cpp/tests/rolling/rolling_test.cpp @@ -23,15 +23,21 @@ #include #include +#include #include #include #include #include +#include +#include #include +#include #include #include +#include +#include #include using cudf::bitmask_type; @@ -396,7 +402,14 @@ class RollingTest : public cudf::test::BaseFixture { } }; -// // ------------- expected failures -------------------- +template +class RollingVarStdTest : public cudf::test::BaseFixture { +}; + +TYPED_TEST_CASE(RollingVarStdTest, cudf::test::FixedWidthTypesWithoutChrono); + +class RollingtVarStdTestUntyped : public cudf::test::BaseFixture { +}; class RollingErrorTest : public cudf::test::BaseFixture { }; @@ -602,6 +615,118 @@ TYPED_TEST(RollingTest, SimpleStatic) this->run_test_col_agg(input, window, window, 1); } +TYPED_TEST(RollingVarStdTest, SimpleStaticVarianceStd) +{ +#define XXX 0 // NULL stub + + using ResultType = double; + + double const nan = std::numeric_limits::signaling_NaN(); + + size_type const ddof = 1, min_periods = 0, preceding_window = 2, following_window = 1; + + auto const col_data = + cudf::test::make_type_param_vector({XXX, XXX, 9, 5, XXX, XXX, XXX, 0, 8, 5, 8}); + const std::vector col_mask = {0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1}; + + auto const expected_var = + cudf::is_boolean() + ? std::vector{XXX, nan, 0, 0, nan, XXX, nan, 0.5, 0.3333333333333333, 0, 0} + : std::vector{XXX, nan, 8, 8, nan, XXX, nan, 32, 16.33333333333333, 3, 4.5}; + std::vector expected_std(expected_var.size()); + std::transform(expected_var.begin(), expected_var.end(), expected_std.begin(), [](auto const& x) { + return std::sqrt(x); + }); + + const std::vector expected_mask = {0, /* all null window */ + 1, /* 0 div 0, nan */ + 1, + 1, + 1, /* 0 div 0, nan */ + 0, /* all null window */ + 1, /* 0 div 0, nan */ + 1, + 1, + 1, + 1}; + + fixed_width_column_wrapper input(col_data.begin(), col_data.end(), col_mask.begin()); + fixed_width_column_wrapper var_expect( + expected_var.begin(), expected_var.end(), expected_mask.begin()); + fixed_width_column_wrapper std_expect( + expected_std.begin(), expected_std.end(), expected_mask.begin()); + + std::unique_ptr var_result, std_result; + // static sizes + EXPECT_NO_THROW(var_result = cudf::rolling_window(input, + preceding_window, + following_window, + min_periods, + dynamic_cast( + *cudf::make_variance_aggregation(ddof)));); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*var_result, var_expect); + + EXPECT_NO_THROW(std_result = cudf::rolling_window(input, + preceding_window, + following_window, + min_periods, + dynamic_cast( + *cudf::make_std_aggregation(ddof)));); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*std_result, std_expect); + +#undef XXX +} + +TEST_F(RollingtVarStdTestUntyped, SimpleStaticVarianceStdInfNaN) +{ +#define XXX 0. // NULL stub + + using ResultType = double; + + double const inf = std::numeric_limits::infinity(); + double const nan = std::numeric_limits::signaling_NaN(); + size_type const ddof = 1, min_periods = 1, preceding_window = 3, following_window = 0; + + auto const col_data = + cudf::test::make_type_param_vector({5., 4., XXX, inf, 4., 8., 0., nan, XXX, 5.}); + const std::vector col_mask = {1, 1, 0, 1, 1, 1, 1, 1, 0, 1}; + + auto const expected_var = + std::vector{nan, 0.5, 0.5, nan, nan, nan, 16, nan, nan, nan}; + std::vector expected_std(expected_var.size()); + std::transform(expected_var.begin(), expected_var.end(), expected_std.begin(), [](auto const& x) { + return std::sqrt(x); + }); + + const std::vector expected_mask = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; + + fixed_width_column_wrapper input(col_data.begin(), col_data.end(), col_mask.begin()); + fixed_width_column_wrapper var_expect( + expected_var.begin(), expected_var.end(), expected_mask.begin()); + fixed_width_column_wrapper std_expect( + expected_std.begin(), expected_std.end(), expected_mask.begin()); + + std::unique_ptr var_result, std_result; + // static sizes + EXPECT_NO_THROW(var_result = cudf::rolling_window(input, + preceding_window, + following_window, + min_periods, + dynamic_cast( + *cudf::make_variance_aggregation(ddof)));); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*var_result, var_expect); + + EXPECT_NO_THROW(std_result = cudf::rolling_window(input, + preceding_window, + following_window, + min_periods, + dynamic_cast( + *cudf::make_std_aggregation(ddof)));); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*std_result, std_expect); + +#undef XXX +} + // negative sizes TYPED_TEST(RollingTest, NegativeWindowSizes) { @@ -1118,6 +1243,71 @@ TYPED_TEST(FixedPointTests, MinMaxCountLagLeadNulls) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_rowno, rowno->view()); } +TYPED_TEST(FixedPointTests, VarStd) +{ + using namespace numeric; + using namespace cudf; + using decimalXX = TypeParam; + using RepType = cudf::device_storage_type_t; + using fp_wrapper = cudf::test::fixed_point_column_wrapper; + using fw_wrapper = cudf::test::fixed_width_column_wrapper; + + double const nan = std::numeric_limits::signaling_NaN(); + double const inf = std::numeric_limits::infinity(); + size_type preceding_window{3}, following_window{0}, min_periods{1}, ddof{2}; + + // The variance of `input` given `scale` == 0 + std::vector result_base_v{ + nan, inf, 1882804.66666666667, 1928018.666666666667, 1874.6666666666667, 2.0}; + std::vector result_mask_v{1, 1, 1, 1, 1, 1}; + + // var tests + for (int32_t s = -2; s <= 2; s++) { + auto const scale = scale_type{s}; + auto const input = fp_wrapper{{42, 1729, 55, 3, 1, 2}, {1, 1, 1, 1, 1, 1}, scale}; + + auto got = rolling_window( + input, + preceding_window, + following_window, + min_periods, + dynamic_cast(*cudf::make_variance_aggregation(ddof))); + + std::vector result_scaled_v(result_base_v.size()); + std::transform( + result_base_v.begin(), result_base_v.end(), result_scaled_v.begin(), [&s](auto x) { + // When values are scaled by 10^n, the variance is scaled by 10^2n. + return x * exp10(s) * exp10(s); + }); + fw_wrapper expect(result_scaled_v.begin(), result_scaled_v.end(), result_mask_v.begin()); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expect, *got); + } + + // std tests + for (int32_t s = -2; s <= 2; s++) { + auto const scale = scale_type{s}; + auto const input = fp_wrapper{{42, 1729, 55, 3, 1, 2}, {1, 1, 1, 1, 1, 1}, scale}; + + auto got = rolling_window( + input, + preceding_window, + following_window, + min_periods, + dynamic_cast(*cudf::make_std_aggregation(ddof))); + + std::vector result_scaled_v(result_base_v.size()); + std::transform( + result_base_v.begin(), result_base_v.end(), result_scaled_v.begin(), [&s](auto x) { + // When values are scaled by 10^n, the variance is scaled by 10^2n. + return std::sqrt(x * exp10(s) * exp10(s)); + }); + fw_wrapper expect(result_scaled_v.begin(), result_scaled_v.end(), result_mask_v.begin()); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expect, *got); + } +} + class RollingDictionaryTest : public cudf::test::BaseFixture { };