Skip to content

Commit

Permalink
cuDF/libcudf exponentially weighted moving averages (#9027)
Browse files Browse the repository at this point in the history
Adds an exponentially weighted moving average aggregation to `cudf::scan` and plumbs it up through `cudf.Series.ewm`, similar to `pandas.Series.ewm`.

partially resolves #1263

Authors:
  - https://github.com/brandon-b-miller
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Bradley Dice (https://github.com/bdice)

URL: #9027
  • Loading branch information
brandon-b-miller authored Jun 24, 2024
1 parent f583879 commit bd76bf6
Show file tree
Hide file tree
Showing 21 changed files with 892 additions and 7 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ add_library(
src/reductions/product.cu
src/reductions/reductions.cpp
src/reductions/scan/rank_scan.cu
src/reductions/scan/ewm.cu
src/reductions/scan/scan.cpp
src/reductions/scan/scan_exclusive.cu
src/reductions/scan/scan_inclusive.cu
Expand Down
41 changes: 40 additions & 1 deletion cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,6 +103,7 @@ class aggregation {
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of current index (relative to rolling window)
EWMA, ///< get exponential weighted moving average at current index
RANK, ///< get rank of current index
COLLECT_LIST, ///< collect values into a list
COLLECT_SET, ///< collect values into a list without duplicate entries
Expand Down Expand Up @@ -250,6 +251,8 @@ class segmented_reduce_aggregation : public virtual aggregation {
enum class udf_type : bool { CUDA, PTX };
/// Type of correlation method.
enum class correlation_type : int32_t { PEARSON, KENDALL, SPEARMAN };
/// Type of treatment of EWM input values' first value
enum class ewm_history : int32_t { INFINITE, FINITE };

/// Factory to create a SUM aggregation
/// @return A SUM aggregation object
Expand Down Expand Up @@ -411,6 +414,42 @@ std::unique_ptr<Base> make_nth_element_aggregation(
template <typename Base = aggregation>
std::unique_ptr<Base> make_row_number_aggregation();

/**
* @brief Factory to create an EWMA aggregation
*
* `EWMA` returns a non-nullable column with the same type as the input,
* whose values are the exponentially weighted moving average of the input
* sequence. Let these values be known as the y_i.
*
* EWMA aggregations are parameterized by a center of mass (`com`) which
* affects the contribution of the previous values (y_0 ... y_{i-1}) in
* computing the y_i.
*
* EWMA aggregations are also parameterized by a history `cudf::ewm_history`.
* Special considerations have to be given to the mathematical treatment of
* the first value of the input sequence. There are two approaches to this,
* one which considers the first value of the sequence to be the exponential
* weighted moving average of some infinite history of data, and one which
* takes the first value to be the only datapoint known. These assumptions
* lead to two different formulas for the y_i. `ewm_history` selects which.
*
* EWMA aggregations have special null handling. Nulls have two effects. The
* first is to propagate forward the last valid value as far as it has been
* computed. This could be thought of as the nulls not affecting the average
* in any way. The second effect changes the way the y_i are computed. Since
* a moving average is conceptually designed to weight contributing values by
* their recency, nulls ought to count as valid periods even though they do
* not change the average. For example, if the input sequence is {1, NULL, 3}
* then when computing y_2 one should weigh y_0 as if it occurs two periods
* before y_2 rather than just one.
*
* @param center_of_mass the center of mass.
* @param history which assumption to make about the first value
* @return A EWM aggregation object
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_ewma_aggregation(double const center_of_mass, ewm_history history);

/**
* @brief Factory to create a RANK aggregation
*
Expand Down
44 changes: 44 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class simple_aggregations_collector { // Declares the interface for the simple
class nth_element_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(data_type col_type,
class row_number_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(data_type col_type,
class ewma_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(data_type col_type,
class rank_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
Expand Down Expand Up @@ -141,6 +143,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer
virtual void visit(class correlation_aggregation const& agg);
virtual void visit(class tdigest_aggregation const& agg);
virtual void visit(class merge_tdigest_aggregation const& agg);
virtual void visit(class ewma_aggregation const& agg);
};

/**
Expand Down Expand Up @@ -667,6 +670,40 @@ class row_number_aggregation final : public rolling_aggregation {
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived class for specifying an ewma aggregation
*/
class ewma_aggregation final : public scan_aggregation {
public:
double const center_of_mass;
cudf::ewm_history history;

ewma_aggregation(double const center_of_mass, cudf::ewm_history history)
: aggregation{EWMA}, center_of_mass{center_of_mass}, history{history}
{
}

std::unique_ptr<aggregation> clone() const override
{
return std::make_unique<ewma_aggregation>(*this);
}

std::vector<std::unique_ptr<aggregation>> get_simple_aggregations(
data_type col_type, simple_aggregations_collector& collector) const override
{
return collector.visit(col_type, *this);
}

bool is_equal(aggregation const& _other) const override
{
if (!this->aggregation::is_equal(_other)) { return false; }
auto const& other = dynamic_cast<ewma_aggregation const&>(_other);
return this->center_of_mass == other.center_of_mass and this->history == other.history;
}

void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived class for specifying a rank aggregation
*/
Expand Down Expand Up @@ -1336,6 +1373,11 @@ struct target_type_impl<Source, aggregation::ROW_NUMBER> {
using type = size_type;
};

template <typename Source>
struct target_type_impl<Source, aggregation::EWMA> {
using type = double;
};

// Always use size_type accumulator for RANK
template <typename Source>
struct target_type_impl<Source, aggregation::RANK> {
Expand Down Expand Up @@ -1536,6 +1578,8 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind
return f.template operator()<aggregation::TDIGEST>(std::forward<Ts>(args)...);
case aggregation::MERGE_TDIGEST:
return f.template operator()<aggregation::MERGE_TDIGEST>(std::forward<Ts>(args)...);
case aggregation::EWMA:
return f.template operator()<aggregation::EWMA>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ std::vector<std::unique_ptr<aggregation>> simple_aggregations_collector::visit(
return visit(col_type, static_cast<aggregation const&>(agg));
}

std::vector<std::unique_ptr<aggregation>> simple_aggregations_collector::visit(
data_type col_type, ewma_aggregation const& agg)
{
return visit(col_type, static_cast<aggregation const&>(agg));
}

std::vector<std::unique_ptr<aggregation>> simple_aggregations_collector::visit(
data_type col_type, rank_aggregation const& agg)
{
Expand Down Expand Up @@ -333,6 +339,11 @@ void aggregation_finalizer::visit(row_number_aggregation const& agg)
visit(static_cast<aggregation const&>(agg));
}

void aggregation_finalizer::visit(ewma_aggregation const& agg)
{
visit(static_cast<aggregation const&>(agg));
}

void aggregation_finalizer::visit(rank_aggregation const& agg)
{
visit(static_cast<aggregation const&>(agg));
Expand Down Expand Up @@ -665,6 +676,17 @@ std::unique_ptr<Base> make_row_number_aggregation()
template std::unique_ptr<aggregation> make_row_number_aggregation<aggregation>();
template std::unique_ptr<rolling_aggregation> make_row_number_aggregation<rolling_aggregation>();

/// Factory to create an EWMA aggregation
template <typename Base>
std::unique_ptr<Base> make_ewma_aggregation(double const com, cudf::ewm_history history)
{
return std::make_unique<detail::ewma_aggregation>(com, history);
}
template std::unique_ptr<aggregation> make_ewma_aggregation<aggregation>(double const com,
cudf::ewm_history history);
template std::unique_ptr<scan_aggregation> make_ewma_aggregation<scan_aggregation>(
double const com, cudf::ewm_history history);

/// Factory to create a RANK aggregation
template <typename Base>
std::unique_ptr<Base> make_rank_aggregation(rank_method method,
Expand Down
Loading

0 comments on commit bd76bf6

Please sign in to comment.