Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support VARIANCE and STD aggregation in rolling op #8809

Merged
merged 40 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7cc6630
initial
isVoid Jul 21, 2021
6f64691
Compiles
isVoid Jul 21, 2021
2ab88fe
style
isVoid Jul 22, 2021
2f63568
clean up
isVoid Jul 23, 2021
16cb7a8
clean up
isVoid Jul 23, 2021
ddd59f0
header cleanup
isVoid Jul 23, 2021
bcd00f0
.
isVoid Jul 23, 2021
da8b755
More cleanup
isVoid Jul 23, 2021
3ff0d8d
revert ptx changes
isVoid Jul 23, 2021
fc00d8f
Static tests
isVoid Jul 23, 2021
1259df6
undo python changes
isVoid Jul 23, 2021
3539e1b
.
isVoid Jul 23, 2021
07aa54d
docs
isVoid Jul 23, 2021
d2a6407
Merge branch 'branch-21.10' of https://github.com/rapidsai/cudf into …
isVoid Jul 25, 2021
885d66e
remove count==1 restriction
isVoid Jul 25, 2021
68ab4ae
add ddof tests
isVoid Jul 25, 2021
1e1c8cd
docfix
isVoid Jul 26, 2021
1288a56
fixed_point fix
isVoid Jul 26, 2021
3a9e589
docs
isVoid Jul 26, 2021
490539c
docfix
isVoid Jul 26, 2021
394f0f0
docfix
isVoid Jul 27, 2021
bc0920b
remove thrust::reduce
isVoid Jul 27, 2021
934f104
.
isVoid Jul 27, 2021
b215199
Update cpp/src/rolling/rolling_detail.cuh
isVoid Jul 29, 2021
a3af3e9
Update cpp/src/rolling/rolling_detail.cuh
isVoid Jul 29, 2021
ae33de0
Update cpp/src/rolling/rolling_detail.cuh
isVoid Jul 29, 2021
645a172
address review comments
isVoid Jul 29, 2021
d94b8db
Apply suggestions from code review
isVoid Jul 29, 2021
0fe4a87
Add nan tests
isVoid Jul 30, 2021
9ce41ae
Remove auto generated column test
isVoid Jul 30, 2021
b62a40d
Merge branch 'branch-21.10' of https://github.com/rapidsai/cudf into …
isVoid Aug 19, 2021
43df716
count==0 case maps to invalid output.
isVoid Aug 21, 2021
d62eb00
Apply review comments: div by zero result is valid element
isVoid Aug 25, 2021
0b78ab3
Update cpp/src/rolling/rolling_detail.cuh
isVoid Aug 25, 2021
e3f89df
ddof > count situation is valid but nan
isVoid Aug 30, 2021
a504017
Merge branch 'rolling_std' of github.com:isVoid/cudf into rolling_std
isVoid Aug 30, 2021
d3cedb1
make operator constant
isVoid Aug 31, 2021
75e8140
header cleanup
isVoid Sep 1, 2021
d5359ba
Update cpp/tests/rolling/rolling_test.cpp
isVoid Sep 1, 2021
a14d77e
style
isVoid Sep 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class rolling_aggregation : public virtual aggregation {

protected:
rolling_aggregation() {}
rolling_aggregation(aggregation::Kind a) : aggregation{a} {}
};

/**
Expand Down Expand Up @@ -203,6 +204,8 @@ std::unique_ptr<Base> make_m2_aggregation();
*
* @param ddof Delta degrees of freedom. The divisor used in calculation of
* `variance` is `N - ddof`, where `N` is the population size.
*
* @throw cudf::logic_error if input type is chrono or compound types.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_variance_aggregation(size_type ddof = 1);
Expand All @@ -212,6 +215,8 @@ std::unique_ptr<Base> make_variance_aggregation(size_type ddof = 1);
*
* @param ddof Delta degrees of freedom. The divisor used in calculation of
* `std` is `N - ddof`, where `N` is the population size.
*
* @throw cudf::logic_error if input type is chrono or compound types.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_std_aggregation(size_type ddof = 1);
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class m2_aggregation : public groupby_aggregation {
/**
* @brief Derived class for specifying a standard deviation/variance aggregation
*/
class std_var_aggregation : public groupby_aggregation {
class std_var_aggregation : public rolling_aggregation, public groupby_aggregation {
public:
size_type _ddof; ///< Delta degrees of freedom

Expand All @@ -342,7 +342,7 @@ class std_var_aggregation : public groupby_aggregation {
size_t do_hash() const override { return this->aggregation::do_hash() ^ hash_impl(); }

protected:
std_var_aggregation(aggregation::Kind k, size_type ddof) : aggregation(k), _ddof{ddof}
std_var_aggregation(aggregation::Kind k, size_type ddof) : rolling_aggregation(k), _ddof{ddof}
{
CUDF_EXPECTS(k == aggregation::STD or k == aggregation::VARIANCE,
"std_var_aggregation can accept only STD, VARIANCE");
Expand Down
9 changes: 6 additions & 3 deletions cpp/include/cudf/rolling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ namespace cudf {
* - instead of storing NA/NaN for output rows that do not meet the minimum number of observations
* this function updates the valid bitmask of the column to indicate which elements are valid.
*
* The returned column for count aggregation always has `INT32` type. All other operators return a
* column of the same type as the input. Therefore it is suggested to convert integer column types
* (especially low-precision integers) to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`.
* Notes on return column types:
* - The returned column for count aggregation always has `INT32` type.
* - The returned column for VARIANCE/STD aggregations always has `FLOAT64` type.
* - All other operators return a column of the same type as the input. Therefore
* it is suggested to convert integer column types (especially low-precision integers)
* to `FLOAT32` or `FLOAT64` before doing a rolling `MEAN`.
*
* @param[in] input_col The input column
* @param[in] preceding_window The static rolling window size in the backward direction.
Expand Down
14 changes: 13 additions & 1 deletion cpp/include/cudf_test/type_lists.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,23 @@ using FixedWidthTypes = Concat<NumericTypes, ChronoTypes, FixedPointTypes>;
* Example:
* ```
* // Invokes all typed fixture tests for all fixed-width types in libcudf
* TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypes);
* TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypesWithoutFixedPoint);
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
* ```
*/
using FixedWidthTypesWithoutFixedPoint = Concat<NumericTypes, ChronoTypes>;

/**
* @brief Provides a list of all fixed-width element types except for the
* chrono types for use in GTest typed tests.
*
* Example:
* ```
* // Invokes all typed fixture tests for all fixed-width types in libcudf
* TYPED_TEST_CASE(MyTypedFixture, cudf::test::FixedWidthTypesWithoutChrono);
* ```
*/
using FixedWidthTypesWithoutChrono = Concat<NumericTypes, FixedPointTypes>;

/**
* @brief Provides a list of sortable types for use in GTest typed tests.
*
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ std::unique_ptr<Base> make_variance_aggregation(size_type ddof)
return std::make_unique<detail::var_aggregation>(ddof);
}
template std::unique_ptr<aggregation> make_variance_aggregation<aggregation>(size_type ddof);
template std::unique_ptr<rolling_aggregation> make_variance_aggregation<rolling_aggregation>(
size_type ddof);
template std::unique_ptr<groupby_aggregation> make_variance_aggregation<groupby_aggregation>(
size_type ddof);

Expand All @@ -475,6 +477,8 @@ std::unique_ptr<Base> make_std_aggregation(size_type ddof)
return std::make_unique<detail::std_aggregation>(ddof);
}
template std::unique_ptr<aggregation> make_std_aggregation<aggregation>(size_type ddof);
template std::unique_ptr<rolling_aggregation> make_std_aggregation<rolling_aggregation>(
size_type ddof);
template std::unique_ptr<groupby_aggregation> make_std_aggregation<groupby_aggregation>(
size_type ddof);

Expand Down
32 changes: 16 additions & 16 deletions cpp/src/rolling/rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,6 @@
#include "rolling_detail.cuh"

namespace cudf {

// Applies a fixed-size rolling window function to the values in a column.
std::unique_ptr<column> rolling_window(column_view const& input,
size_type preceding_window,
size_type following_window,
size_type min_periods,
rolling_aggregation const& agg,
rmm::mr::device_memory_resource* mr)
{
auto defaults =
cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input;
return rolling_window(
input, empty_like(defaults)->view(), preceding_window, following_window, min_periods, agg, mr);
}

namespace detail {

// Applies a fixed-size rolling window function to the values in a column.
Expand Down Expand Up @@ -127,7 +112,8 @@ std::unique_ptr<column> rolling_window(column_view const& input,

} // namespace detail

// Applies a fixed-size rolling window function to the values in a column.
// Applies a fixed-size rolling window function to the values in a column, with default output
// specified
std::unique_ptr<column> rolling_window(column_view const& input,
column_view const& default_outputs,
size_type preceding_window,
Expand All @@ -146,6 +132,20 @@ std::unique_ptr<column> rolling_window(column_view const& input,
mr);
}

// Applies a fixed-size rolling window function to the values in a column, without default specified
std::unique_ptr<column> rolling_window(column_view const& input,
size_type preceding_window,
size_type following_window,
size_type min_periods,
rolling_aggregation const& agg,
rmm::mr::device_memory_resource* mr)
{
auto defaults =
cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input;
return rolling_window(
input, empty_like(defaults)->view(), preceding_window, following_window, min_periods, agg, mr);
}

// Applies a variable-size rolling window function to the values in a column.
std::unique_ptr<column> rolling_window(column_view const& input,
column_view const& preceding_window,
Expand Down
131 changes: 118 additions & 13 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,17 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/groupby/sort_helper.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/unary.hpp>
isVoid marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
#include <cudf/lists/detail/drop_list_duplicates.hpp>
#include <cudf/rolling.hpp>
#include <cudf/strings/detail/utilities.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/error.hpp>
Expand All @@ -54,13 +50,12 @@

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/binary_search.h>
#include <thrust/detail/execution_policy.h>
#include <thrust/execution_policy.h>
#include <thrust/find.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

#include <cuda/std/limits>

#include <memory>

Expand Down Expand Up @@ -279,6 +274,86 @@ struct DeviceRollingCountAll {
}
};

/**
* @brief Operator for applying a VAR rolling aggregation on a single window.
*/
template <typename InputType>
struct DeviceRollingVariance {
size_type const min_periods;
size_type const ddof;

// what operations do we support
template <typename T = InputType, aggregation::Kind O = aggregation::VARIANCE>
static constexpr bool is_supported()
{
return is_fixed_width<InputType>() and not is_chrono<InputType>();
}

DeviceRollingVariance(size_type _min_periods, size_type _ddof)
: min_periods(_min_periods), ddof{_ddof}
{
}

template <typename OutputType, bool has_nulls>
bool __device__ operator()(column_device_view const& input,
column_device_view const&,
mutable_column_device_view& output,
size_type start_index,
size_type end_index,
size_type current_index) const
{
using DeviceInputType = device_storage_type_t<InputType>;

// valid counts in the window
cudf::size_type const count =
has_nulls ? thrust::count_if(thrust::seq,
thrust::make_counting_iterator(start_index),
thrust::make_counting_iterator(end_index),
[&input](auto i) { return input.is_valid_nocheck(i); })
: end_index - start_index;

// Result will be null if any of the following conditions are met:
// - All inputs are null
// - Number of valid inputs is less than `min_periods`
bool output_is_valid = count > 0 and (count >= min_periods);

if (output_is_valid) {
if (count >= ddof) {
// Welford algorithm
// See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
OutputType m{0}, m2{0};
size_type running_count{0};

for (size_type i = start_index; i < end_index; i++) {
if (has_nulls and input.is_null_nocheck(i)) { continue; }

OutputType const x = static_cast<OutputType>(input.element<DeviceInputType>(i));

running_count++;
OutputType const tmp1 = x - m;
m += tmp1 / running_count;
OutputType const tmp2 = x - m;
m2 += tmp1 * tmp2;
}
if constexpr (is_fixed_point<InputType>()) {
// For fixed_point types, the previous computed value used unscaled rep-value,
// the final result should be multiplied by the square of decimal `scale`.
OutputType scaleby = exp10(static_cast<double>(input.type().scale()));
scaleby *= scaleby;
output.element<OutputType>(current_index) = m2 / (count - ddof) * scaleby;
} else {
output.element<OutputType>(current_index) = m2 / (count - ddof);
}
} else {
output.element<OutputType>(current_index) =
cuda::std::numeric_limits<OutputType>::signaling_NaN();
}
}

return output_is_valid;
}
};

/**
* @brief Operator for applying a ROW_NUMBER rolling aggregation on a single window.
*/
Expand Down Expand Up @@ -506,6 +581,11 @@ struct corresponding_rolling_operator<InputType, aggregation::Kind::LEAD> {
using type = DeviceRollingLead<InputType>;
};

template <typename InputType>
struct corresponding_rolling_operator<InputType, aggregation::Kind::VARIANCE> {
using type = DeviceRollingVariance<InputType>;
};

template <typename InputType>
struct corresponding_rolling_operator<InputType, aggregation::Kind::LAG> {
using type = DeviceRollingLag<InputType>;
Expand All @@ -527,15 +607,24 @@ struct create_rolling_operator<
InputType,
op,
std::enable_if_t<corresponding_rolling_operator<InputType, op>::type::is_supported()>> {
template <
typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O != aggregation::Kind::LEAD && O != aggregation::Kind::LAG>* = nullptr>
template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O != aggregation::Kind::LEAD && O != aggregation::Kind::LAG &&
O != aggregation::Kind::VARIANCE>* = nullptr>
auto operator()(size_type min_periods, rolling_aggregation const&)
{
return typename corresponding_rolling_operator<InputType, op>::type(min_periods);
}

template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O == aggregation::Kind::VARIANCE>* = nullptr>
auto operator()(size_type min_periods, rolling_aggregation const& agg)
{
return DeviceRollingVariance<InputType>{
min_periods, dynamic_cast<cudf::detail::var_aggregation const&>(agg)._ddof};
}

template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O == aggregation::Kind::LEAD>* = nullptr>
Expand Down Expand Up @@ -632,6 +721,16 @@ class rolling_aggregation_preprocessor final : public cudf::detail::simple_aggre
return {};
}

// STD aggregations depends on VARIANCE aggregation. Each element is applied
// with sqaured-root in the finalize() step.
std::vector<std::unique_ptr<aggregation>> visit(data_type,
cudf::detail::std_aggregation const& agg) override
{
std::vector<std::unique_ptr<aggregation>> aggs;
aggs.push_back(make_variance_aggregation(agg._ddof));
return aggs;
}

// LEAD and LAG have custom behaviors for non fixed-width types.
std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, cudf::detail::lead_lag_aggregation const& agg) override
Expand Down Expand Up @@ -750,6 +849,12 @@ class rolling_aggregation_postprocessor final : public cudf::detail::aggregation
lists_column_view(collected_list->view()), agg._nulls_equal, agg._nans_equal, stream, mr);
}

// perform the element-wise square root operation on result of VARIANCE
void visit(cudf::detail::std_aggregation const& agg) override
{
result = detail::unary_operation(intermediate->view(), unary_operator::SQRT, stream, mr);
}

std::unique_ptr<column> get_result()
{
CUDF_EXPECTS(result != nullptr,
Expand Down
Loading