diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index aab0a9b2d49..5fd68bfb26c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -502,6 +502,7 @@ add_library( src/reductions/product.cu src/reductions/reductions.cpp src/reductions/scan/rank_scan.cu + src/reductions/scan/ewm.cu src/reductions/scan/scan.cpp src/reductions/scan/scan_exclusive.cu src/reductions/scan/scan_inclusive.cu diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index d458c831f19..3c1023017be 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,6 +103,7 @@ class aggregation { NUNIQUE, ///< count number of unique elements NTH_ELEMENT, ///< get the nth element ROW_NUMBER, ///< get row-number of current index (relative to rolling window) + EWMA, ///< get exponential weighted moving average at current index RANK, ///< get rank of current index COLLECT_LIST, ///< collect values into a list COLLECT_SET, ///< collect values into a list without duplicate entries @@ -250,6 +251,8 @@ class segmented_reduce_aggregation : public virtual aggregation { enum class udf_type : bool { CUDA, PTX }; /// Type of correlation method. enum class correlation_type : int32_t { PEARSON, KENDALL, SPEARMAN }; +/// Type of treatment of EWM input values' first value +enum class ewm_history : int32_t { INFINITE, FINITE }; /// Factory to create a SUM aggregation /// @return A SUM aggregation object @@ -411,6 +414,42 @@ std::unique_ptr make_nth_element_aggregation( template std::unique_ptr make_row_number_aggregation(); +/** + * @brief Factory to create an EWMA aggregation + * + * `EWMA` returns a non-nullable column with the same type as the input, + * whose values are the exponentially weighted moving average of the input + * sequence. Let these values be known as the y_i. + * + * EWMA aggregations are parameterized by a center of mass (`com`) which + * affects the contribution of the previous values (y_0 ... y_{i-1}) in + * computing the y_i. + * + * EWMA aggregations are also parameterized by a history `cudf::ewm_history`. + * Special considerations have to be given to the mathematical treatment of + * the first value of the input sequence. There are two approaches to this, + * one which considers the first value of the sequence to be the exponential + * weighted moving average of some infinite history of data, and one which + * takes the first value to be the only datapoint known. These assumptions + * lead to two different formulas for the y_i. `ewm_history` selects which. + * + * EWMA aggregations have special null handling. Nulls have two effects. The + * first is to propagate forward the last valid value as far as it has been + * computed. This could be thought of as the nulls not affecting the average + * in any way. The second effect changes the way the y_i are computed. Since + * a moving average is conceptually designed to weight contributing values by + * their recency, nulls ought to count as valid periods even though they do + * not change the average. For example, if the input sequence is {1, NULL, 3} + * then when computing y_2 one should weigh y_0 as if it occurs two periods + * before y_2 rather than just one. + * + * @param center_of_mass the center of mass. + * @param history which assumption to make about the first value + * @return A EWM aggregation object + */ +template +std::unique_ptr make_ewma_aggregation(double const center_of_mass, ewm_history history); + /** * @brief Factory to create a RANK aggregation * diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index edee83783b8..843414817e3 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -76,6 +76,8 @@ class simple_aggregations_collector { // Declares the interface for the simple class nth_element_aggregation const& agg); virtual std::vector> visit(data_type col_type, class row_number_aggregation const& agg); + virtual std::vector> visit(data_type col_type, + class ewma_aggregation const& agg); virtual std::vector> visit(data_type col_type, class rank_aggregation const& agg); virtual std::vector> visit( @@ -141,6 +143,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer virtual void visit(class correlation_aggregation const& agg); virtual void visit(class tdigest_aggregation const& agg); virtual void visit(class merge_tdigest_aggregation const& agg); + virtual void visit(class ewma_aggregation const& agg); }; /** @@ -667,6 +670,40 @@ class row_number_aggregation final : public rolling_aggregation { void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } }; +/** + * @brief Derived class for specifying an ewma aggregation + */ +class ewma_aggregation final : public scan_aggregation { + public: + double const center_of_mass; + cudf::ewm_history history; + + ewma_aggregation(double const center_of_mass, cudf::ewm_history history) + : aggregation{EWMA}, center_of_mass{center_of_mass}, history{history} + { + } + + std::unique_ptr clone() const override + { + return std::make_unique(*this); + } + + std::vector> get_simple_aggregations( + data_type col_type, simple_aggregations_collector& collector) const override + { + return collector.visit(col_type, *this); + } + + bool is_equal(aggregation const& _other) const override + { + if (!this->aggregation::is_equal(_other)) { return false; } + auto const& other = dynamic_cast(_other); + return this->center_of_mass == other.center_of_mass and this->history == other.history; + } + + void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } +}; + /** * @brief Derived class for specifying a rank aggregation */ @@ -1336,6 +1373,11 @@ struct target_type_impl { using type = size_type; }; +template +struct target_type_impl { + using type = double; +}; + // Always use size_type accumulator for RANK template struct target_type_impl { @@ -1536,6 +1578,8 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind return f.template operator()(std::forward(args)...); case aggregation::MERGE_TDIGEST: return f.template operator()(std::forward(args)...); + case aggregation::EWMA: + return f.template operator()(std::forward(args)...); default: { #ifndef __CUDA_ARCH__ CUDF_FAIL("Unsupported aggregation."); diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index adee9147740..5422304c5cb 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -154,6 +154,12 @@ std::vector> simple_aggregations_collector::visit( return visit(col_type, static_cast(agg)); } +std::vector> simple_aggregations_collector::visit( + data_type col_type, ewma_aggregation const& agg) +{ + return visit(col_type, static_cast(agg)); +} + std::vector> simple_aggregations_collector::visit( data_type col_type, rank_aggregation const& agg) { @@ -333,6 +339,11 @@ void aggregation_finalizer::visit(row_number_aggregation const& agg) visit(static_cast(agg)); } +void aggregation_finalizer::visit(ewma_aggregation const& agg) +{ + visit(static_cast(agg)); +} + void aggregation_finalizer::visit(rank_aggregation const& agg) { visit(static_cast(agg)); @@ -665,6 +676,17 @@ std::unique_ptr make_row_number_aggregation() template std::unique_ptr make_row_number_aggregation(); template std::unique_ptr make_row_number_aggregation(); +/// Factory to create an EWMA aggregation +template +std::unique_ptr make_ewma_aggregation(double const com, cudf::ewm_history history) +{ + return std::make_unique(com, history); +} +template std::unique_ptr make_ewma_aggregation(double const com, + cudf::ewm_history history); +template std::unique_ptr make_ewma_aggregation( + double const com, cudf::ewm_history history); + /// Factory to create a RANK aggregation template std::unique_ptr make_rank_aggregation(rank_method method, diff --git a/cpp/src/reductions/scan/ewm.cu b/cpp/src/reductions/scan/ewm.cu new file mode 100644 index 00000000000..3fa2de450ad --- /dev/null +++ b/cpp/src/reductions/scan/ewm.cu @@ -0,0 +1,330 @@ +/* + * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "scan.cuh" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace cudf { +namespace detail { + +template +using pair_type = thrust::pair; + +/** + * @brief functor to be summed over in a prefix sum such that + * the recurrence in question is solved. See + * G. E. Blelloch. Prefix sums and their applications. Technical Report + * CMU-CS-90-190, Nov. 1990. S. 1.4 + * for details + */ +template +class recurrence_functor { + public: + __device__ pair_type operator()(pair_type ci, pair_type cj) + { + return {ci.first * cj.first, ci.second * cj.first + cj.second}; + } +}; + +template +struct ewma_functor_base { + T beta; + const pair_type IDENTITY{1.0, 0.0}; +}; + +template +struct ewma_adjust_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(thrust::tuple const data) + { + // Not const to allow for updating the input value + auto [valid, exp, input] = data; + if (!valid) { return this->IDENTITY; } + if constexpr (not is_numerator) { input = 1; } + + // The value is non-null, but nulls preceding it + // must adjust the second element of the pair + T const beta = this->beta; + return {beta * ((exp != 0) ? pow(beta, exp) : 1), input}; + } +}; + +template +struct ewma_adjust_no_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(T const data) + { + T const beta = this->beta; + if constexpr (is_numerator) { + return {beta, data}; + } else { + return {beta, 1.0}; + } + } +}; + +template +struct ewma_noadjust_nulls_functor : public ewma_functor_base { + /* + In the null case, a denominator actually has to be computed. The formula is + y_{i+1} = (1 - alpha)x_{i-1} + alpha x_i, but really there is a "denominator" + which is the sum of the weights: alpha + (1 - alpha) == 1. If a null is + encountered, that means that the "previous" value is downweighted by a + factor (for each missing value). For example with a single null: + data = {x_0, NULL, x_1}, + y_2 = (1 - alpha)**2 x_0 + alpha * x_2 / (alpha + (1-alpha)**2) + + As such, the pairs must be updated before summing like the adjusted case to + properly downweight the previous values. But now but we also need to compute + the normalization factors and divide the results into them at the end. + */ + __device__ pair_type operator()(thrust::tuple const data) + { + T const beta = this->beta; + auto const [input, index, valid, nullcnt] = data; + if (index == 0) { + return {beta, input}; + } else { + if (!valid) { return this->IDENTITY; } + // preceding value is valid, return normal pair + if (nullcnt == 0) { return {beta, (1.0 - beta) * input}; } + // one or more preceding values is null, adjust by how many + T const factor = (1.0 - beta) + pow(beta, nullcnt + 1); + return {(beta * (pow(beta, nullcnt)) / factor), ((1.0 - beta) * input) / factor}; + } + } +}; + +template +struct ewma_noadjust_no_nulls_functor : public ewma_functor_base { + __device__ pair_type operator()(thrust::tuple const data) + { + T const beta = this->beta; + auto const [input, index] = data; + if (index == 0) { + return {beta, input}; + } else { + return {beta, (1.0 - beta) * input}; + } + } +}; + +/** +* @brief Return an array whose values y_i are the number of null entries +* in between the last valid entry of the input and the current index. +* Example: {1, NULL, 3, 4, NULL, NULL, 7} + -> {0, 0 1, 0, 0, 1, 2} +*/ +rmm::device_uvector null_roll_up(column_view const& input, + rmm::cuda_stream_view stream) +{ + rmm::device_uvector output(input.size(), stream); + + auto device_view = column_device_view::create(input); + auto invalid_it = thrust::make_transform_iterator( + cudf::detail::make_validity_iterator(*device_view), + cuda::proclaim_return_type([] __device__(int valid) -> int { return 1 - valid; })); + + // valid mask {1, 0, 1, 0, 0, 1} leads to output array {0, 0, 1, 0, 1, 2} + thrust::inclusive_scan_by_key(rmm::exec_policy(stream), + invalid_it, + invalid_it + input.size() - 1, + invalid_it, + std::next(output.begin())); + return output; +} + +template +rmm::device_uvector compute_ewma_adjust(column_view const& input, + T const beta, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + rmm::device_uvector output(input.size(), stream); + rmm::device_uvector> pairs(input.size(), stream); + + if (input.has_nulls()) { + rmm::device_uvector nullcnt = null_roll_up(input, stream); + auto device_view = column_device_view::create(input); + auto valid_it = cudf::detail::make_validity_iterator(*device_view); + auto data = + thrust::make_zip_iterator(thrust::make_tuple(valid_it, nullcnt.begin(), input.begin())); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_adjust_nulls_functor{beta}, + recurrence_functor{}); + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_adjust_nulls_functor{beta}, + recurrence_functor{}); + + } else { + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + input.begin(), + input.end(), + pairs.begin(), + ewma_adjust_no_nulls_functor{beta}, + recurrence_functor{}); + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + auto itr = thrust::make_counting_iterator(0); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + itr, + itr + input.size(), + pairs.begin(), + ewma_adjust_no_nulls_functor{beta}, + recurrence_functor{}); + } + + thrust::transform( + rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + output.begin(), + [] __device__(pair_type pair, T numerator) -> T { return numerator / pair.second; }); + + return output; +} + +template +rmm::device_uvector compute_ewma_noadjust(column_view const& input, + T const beta, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + rmm::device_uvector output(input.size(), stream); + rmm::device_uvector> pairs(input.size(), stream); + rmm::device_uvector nullcnt = + [&input, stream]() -> rmm::device_uvector { + if (input.has_nulls()) { + return null_roll_up(input, stream); + } else { + return rmm::device_uvector(input.size(), stream); + } + }(); + // denominators are all 1 and do not need to be computed + // pairs are all (beta, 1-beta x_i) except for the first one + + if (!input.has_nulls()) { + auto data = thrust::make_zip_iterator( + thrust::make_tuple(input.begin(), thrust::make_counting_iterator(0))); + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_noadjust_no_nulls_functor{beta}, + recurrence_functor{}); + + } else { + auto device_view = column_device_view::create(input); + auto valid_it = detail::make_validity_iterator(*device_view); + + auto data = thrust::make_zip_iterator(thrust::make_tuple( + input.begin(), thrust::make_counting_iterator(0), valid_it, nullcnt.begin())); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + data, + data + input.size(), + pairs.begin(), + ewma_noadjust_nulls_functor{beta}, + recurrence_functor()); + } + + // copy the second elements to the output for now + thrust::transform(rmm::exec_policy(stream), + pairs.begin(), + pairs.end(), + output.begin(), + [] __device__(pair_type pair) -> T { return pair.second; }); + return output; +} + +struct ewma_functor { + template ::value)> + std::unique_ptr operator()(scan_aggregation const& agg, + column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + { + CUDF_FAIL("Unsupported type for EWMA."); + } + + template ::value)> + std::unique_ptr operator()(scan_aggregation const& agg, + column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) + { + auto const ewma_agg = dynamic_cast(&agg); + auto const history = ewma_agg->history; + auto const center_of_mass = ewma_agg->center_of_mass; + + // center of mass is easier for the user, but the recurrences are + // better expressed in terms of the derived parameter `beta` + T const beta = center_of_mass / (center_of_mass + 1.0); + + auto result = [&]() { + if (history == cudf::ewm_history::INFINITE) { + return compute_ewma_adjust(input, beta, stream, mr); + } else { + return compute_ewma_noadjust(input, beta, stream, mr); + } + }(); + return std::make_unique(cudf::data_type(cudf::type_to_id()), + input.size(), + result.release(), + rmm::device_buffer{}, + 0); + } +}; + +std::unique_ptr exponentially_weighted_moving_average(column_view const& input, + scan_aggregation const& agg, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return type_dispatcher(input.type(), ewma_functor{}, agg, input, stream, mr); +} + +} // namespace detail +} // namespace cudf diff --git a/cpp/src/reductions/scan/scan.cuh b/cpp/src/reductions/scan/scan.cuh index aeb9e516cd4..6c237741ac3 100644 --- a/cpp/src/reductions/scan/scan.cuh +++ b/cpp/src/reductions/scan/scan.cuh @@ -36,6 +36,12 @@ std::pair mask_scan(column_view const& input_view rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +// exponentially weighted moving average of the input +std::unique_ptr exponentially_weighted_moving_average(column_view const& input, + scan_aggregation const& agg, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + template