Skip to content

Commit

Permalink
Support VARIANCE and STD aggregation in rolling op (#8809)
Browse files Browse the repository at this point in the history
Part 1 of #8695 

This PR adds support to `STD` and `VARIANCE` rolling aggregations in libcudf.
- Supported types include numeric types and fixed point types. Chrono types are not supported - see thread in issue.

Implementation notes:
- [Welford](https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm)'s algorithm is used

Authors:
  - Michael Wang (https://github.com/isVoid)

Approvers:
  - MithunR (https://github.com/mythrocks)
  - David Wendt (https://github.com/davidwendt)

URL: #8809
  • Loading branch information
isVoid authored Sep 8, 2021
1 parent 42a70f3 commit 9b532ab
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 36 deletions.
5 changes: 5 additions & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class rolling_aggregation : public virtual aggregation {

protected:
rolling_aggregation() {}
rolling_aggregation(aggregation::Kind a) : aggregation{a} {}
};

/**
Expand Down Expand Up @@ -203,6 +204,8 @@ std::unique_ptr<Base> make_m2_aggregation();
*
* @param ddof Delta degrees of freedom. The divisor used in calculation of
* `variance` is `N - ddof`, where `N` is the population size.
*
* @throw cudf::logic_error if input type is chrono or compound types.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_variance_aggregation(size_type ddof = 1);
Expand All @@ -212,6 +215,8 @@ std::unique_ptr<Base> make_variance_aggregation(size_type ddof = 1);
*
* @param ddof Delta degrees of freedom. The divisor used in calculation of
* `std` is `N - ddof`, where `N` is the population size.
*
* @throw cudf::logic_error if input type is chrono or compound types.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_std_aggregation(size_type ddof = 1);
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class m2_aggregation : public groupby_aggregation {
/**
* @brief Derived class for specifying a standard deviation/variance aggregation
*/
class std_var_aggregation : public groupby_aggregation {
class std_var_aggregation : public rolling_aggregation, public groupby_aggregation {
public:
size_type _ddof; ///< Delta degrees of freedom

Expand All @@ -342,7 +342,7 @@ class std_var_aggregation : public groupby_aggregation {
size_t do_hash() const override { return this->aggregation::do_hash() ^ hash_impl(); }

protected:
std_var_aggregation(aggregation::Kind k, size_type ddof) : aggregation(k), _ddof{ddof}
std_var_aggregation(aggregation::Kind k, size_type ddof) : rolling_aggregation(k), _ddof{ddof}
{
CUDF_EXPECTS(k == aggregation::STD or k == aggregation::VARIANCE,
"std_var_aggregation can accept only STD, VARIANCE");
Expand Down
9 changes: 6 additions & 3 deletions cpp/include/cudf/rolling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ namespace cudf {
* - instead of storing NA/NaN for output rows that do not meet the minimum number of observations
* this function updates the valid bitmask of the column to indicate which elements are valid.
*
* The returned column for count aggregation always has `INT32` type. All other operators return a
* column of the same type as the input. Therefore it is suggested to convert integer column types
* (especially low-precision integers) to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`.
* Notes on return column types:
* - The returned column for count aggregation always has `INT32` type.
* - The returned column for VARIANCE/STD aggregations always has `FLOAT64` type.
* - All other operators return a column of the same type as the input. Therefore
* it is suggested to convert integer column types (especially low-precision integers)
* to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`.
*
* @param[in] input_col The input column
* @param[in] preceding_window The static rolling window size in the backward direction.
Expand Down
14 changes: 13 additions & 1 deletion cpp/include/cudf_test/type_lists.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,23 @@ using FixedWidthTypes = Concat<NumericTypes, ChronoTypes, FixedPointTypes>;
* Example:
* ```
* // Invokes all typed fixture tests for all fixed-width types in libcudf
* TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypes);
* TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypesWithoutFixedPoint);
* ```
*/
using FixedWidthTypesWithoutFixedPoint = Concat<NumericTypes, ChronoTypes>;

/**
* @brief Provides a list of all fixed-width element types except for the
* chrono types for use in GTest typed tests.
*
* Example:
* ```
* // Invokes all typed fixture tests for all fixed-width types in libcudf
* TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypesWithoutChrono);
* ```
*/
using FixedWidthTypesWithoutChrono = Concat<NumericTypes, FixedPointTypes>;

/**
* @brief Provides a list of sortable types for use in GTest typed tests.
*
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ std::unique_ptr<Base> make_variance_aggregation(size_type ddof)
return std::make_unique<detail::var_aggregation>(ddof);
}
template std::unique_ptr<aggregation> make_variance_aggregation<aggregation>(size_type ddof);
template std::unique_ptr<rolling_aggregation> make_variance_aggregation<rolling_aggregation>(
size_type ddof);
template std::unique_ptr<groupby_aggregation> make_variance_aggregation<groupby_aggregation>(
size_type ddof);

Expand All @@ -475,6 +477,8 @@ std::unique_ptr<Base> make_std_aggregation(size_type ddof)
return std::make_unique<detail::std_aggregation>(ddof);
}
template std::unique_ptr<aggregation> make_std_aggregation<aggregation>(size_type ddof);
template std::unique_ptr<rolling_aggregation> make_std_aggregation<rolling_aggregation>(
size_type ddof);
template std::unique_ptr<groupby_aggregation> make_std_aggregation<groupby_aggregation>(
size_type ddof);

Expand Down
32 changes: 16 additions & 16 deletions cpp/src/rolling/rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,6 @@
#include "rolling_detail.cuh"

namespace cudf {

// Applies a fixed-size rolling window function to the values in a column.
std::unique_ptr<column> rolling_window(column_view const& input,
size_type preceding_window,
size_type following_window,
size_type min_periods,
rolling_aggregation const& agg,
rmm::mr::device_memory_resource* mr)
{
auto defaults =
cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input;
return rolling_window(
input, empty_like(defaults)->view(), preceding_window, following_window, min_periods, agg, mr);
}

namespace detail {

// Applies a fixed-size rolling window function to the values in a column.
Expand Down Expand Up @@ -127,7 +112,8 @@ std::unique_ptr<column> rolling_window(column_view const& input,

} // namespace detail

// Applies a fixed-size rolling window function to the values in a column.
// Applies a fixed-size rolling window function to the values in a column, with default output
// specified
std::unique_ptr<column> rolling_window(column_view const& input,
column_view const& default_outputs,
size_type preceding_window,
Expand All @@ -146,6 +132,20 @@ std::unique_ptr<column> rolling_window(column_view const& input,
mr);
}

// Applies a fixed-size rolling window function to the values in a column, without default specified
std::unique_ptr<column> rolling_window(column_view const& input,
size_type preceding_window,
size_type following_window,
size_type min_periods,
rolling_aggregation const& agg,
rmm::mr::device_memory_resource* mr)
{
auto defaults =
cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input;
return rolling_window(
input, empty_like(defaults)->view(), preceding_window, following_window, min_periods, agg, mr);
}

// Applies a variable-size rolling window function to the values in a column.
std::unique_ptr<column> rolling_window(column_view const& input,
column_view const& preceding_window,
Expand Down
131 changes: 118 additions & 13 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,17 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/groupby/sort_helper.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/unary.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
#include <cudf/lists/detail/drop_list_duplicates.hpp>
#include <cudf/rolling.hpp>
#include <cudf/strings/detail/utilities.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/error.hpp>
Expand All @@ -54,13 +50,12 @@

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/binary_search.h>
#include <thrust/detail/execution_policy.h>
#include <thrust/execution_policy.h>
#include <thrust/find.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

#include <cuda/std/limits>

#include <memory>

Expand Down Expand Up @@ -279,6 +274,86 @@ struct DeviceRollingCountAll {
}
};

/**
* @brief Operator for applying a VAR rolling aggregation on a single window.
*/
template <typename InputType>
struct DeviceRollingVariance {
size_type const min_periods;
size_type const ddof;

// what operations do we support
template <typename T = InputType, aggregation::Kind O = aggregation::VARIANCE>
static constexpr bool is_supported()
{
return is_fixed_width<InputType>() and not is_chrono<InputType>();
}

DeviceRollingVariance(size_type _min_periods, size_type _ddof)
: min_periods(_min_periods), ddof{_ddof}
{
}

template <typename OutputType, bool has_nulls>
bool __device__ operator()(column_device_view const& input,
column_device_view const&,
mutable_column_device_view& output,
size_type start_index,
size_type end_index,
size_type current_index) const
{
using DeviceInputType = device_storage_type_t<InputType>;

// valid counts in the window
cudf::size_type const count =
has_nulls ? thrust::count_if(thrust::seq,
thrust::make_counting_iterator(start_index),
thrust::make_counting_iterator(end_index),
[&input](auto i) { return input.is_valid_nocheck(i); })
: end_index - start_index;

// Result will be null if any of the following conditions are met:
// - All inputs are null
// - Number of valid inputs is less than `min_periods`
bool output_is_valid = count > 0 and (count >= min_periods);

if (output_is_valid) {
if (count >= ddof) {
// Welford algorithm
// See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
OutputType m{0}, m2{0};
size_type running_count{0};

for (size_type i = start_index; i < end_index; i++) {
if (has_nulls and input.is_null_nocheck(i)) { continue; }

OutputType const x = static_cast<OutputType>(input.element<DeviceInputType>(i));

running_count++;
OutputType const tmp1 = x - m;
m += tmp1 / running_count;
OutputType const tmp2 = x - m;
m2 += tmp1 * tmp2;
}
if constexpr (is_fixed_point<InputType>()) {
// For fixed_point types, the previous computed value used unscaled rep-value,
// the final result should be multiplied by the square of decimal `scale`.
OutputType scaleby = exp10(static_cast<double>(input.type().scale()));
scaleby *= scaleby;
output.element<OutputType>(current_index) = m2 / (count - ddof) * scaleby;
} else {
output.element<OutputType>(current_index) = m2 / (count - ddof);
}
} else {
output.element<OutputType>(current_index) =
cuda::std::numeric_limits<OutputType>::signaling_NaN();
}
}

return output_is_valid;
}
};

/**
* @brief Operator for applying a ROW_NUMBER rolling aggregation on a single window.
*/
Expand Down Expand Up @@ -506,6 +581,11 @@ struct corresponding_rolling_operator<InputType, aggregation::Kind::LEAD> {
using type = DeviceRollingLead<InputType>;
};

template <typename InputType>
struct corresponding_rolling_operator<InputType, aggregation::Kind::VARIANCE> {
using type = DeviceRollingVariance<InputType>;
};

template <typename InputType>
struct corresponding_rolling_operator<InputType, aggregation::Kind::LAG> {
using type = DeviceRollingLag<InputType>;
Expand All @@ -527,15 +607,24 @@ struct create_rolling_operator<
InputType,
op,
std::enable_if_t<corresponding_rolling_operator<InputType, op>::type::is_supported()>> {
template <
typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O != aggregation::Kind::LEAD && O != aggregation::Kind::LAG>* = nullptr>
template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O != aggregation::Kind::LEAD && O != aggregation::Kind::LAG &&
O != aggregation::Kind::VARIANCE>* = nullptr>
auto operator()(size_type min_periods, rolling_aggregation const&)
{
return typename corresponding_rolling_operator<InputType, op>::type(min_periods);
}

template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O == aggregation::Kind::VARIANCE>* = nullptr>
auto operator()(size_type min_periods, rolling_aggregation const& agg)
{
return DeviceRollingVariance<InputType>{
min_periods, dynamic_cast<cudf::detail::var_aggregation const&>(agg)._ddof};
}

template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O == aggregation::Kind::LEAD>* = nullptr>
Expand Down Expand Up @@ -632,6 +721,16 @@ class rolling_aggregation_preprocessor final : public cudf::detail::simple_aggre
return {};
}

// STD aggregations depends on VARIANCE aggregation. Each element is applied
// with sqaured-root in the finalize() step.
std::vector<std::unique_ptr<aggregation>> visit(data_type,
cudf::detail::std_aggregation const& agg) override
{
std::vector<std::unique_ptr<aggregation>> aggs;
aggs.push_back(make_variance_aggregation(agg._ddof));
return aggs;
}

// LEAD and LAG have custom behaviors for non fixed-width types.
std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, cudf::detail::lead_lag_aggregation const& agg) override
Expand Down Expand Up @@ -750,6 +849,12 @@ class rolling_aggregation_postprocessor final : public cudf::detail::aggregation
lists_column_view(collected_list->view()), agg._nulls_equal, agg._nans_equal, stream, mr);
}

// perform the element-wise square root operation on result of VARIANCE
void visit(cudf::detail::std_aggregation const& agg) override
{
result = detail::unary_operation(intermediate->view(), unary_operator::SQRT, stream, mr);
}

std::unique_ptr<column> get_result()
{
CUDF_EXPECTS(result != nullptr,
Expand Down
Loading

0 comments on commit 9b532ab

Please sign in to comment.