Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support VARIANCE and STD aggregation in rolling op #8809

Merged
merged 40 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7cc6630
initial
isVoid Jul 21, 2021
6f64691
Compiles
isVoid Jul 21, 2021
2ab88fe
style
isVoid Jul 22, 2021
2f63568
clean up
isVoid Jul 23, 2021
16cb7a8
clean up
isVoid Jul 23, 2021
ddd59f0
header cleanup
isVoid Jul 23, 2021
bcd00f0
.
isVoid Jul 23, 2021
da8b755
More cleanup
isVoid Jul 23, 2021
3ff0d8d
revert ptx changes
isVoid Jul 23, 2021
fc00d8f
Static tests
isVoid Jul 23, 2021
1259df6
undo python changes
isVoid Jul 23, 2021
3539e1b
.
isVoid Jul 23, 2021
07aa54d
docs
isVoid Jul 23, 2021
d2a6407
Merge branch 'branch-21.10' of https://github.com/rapidsai/cudf into …
isVoid Jul 25, 2021
885d66e
remove count==1 restriction
isVoid Jul 25, 2021
68ab4ae
add ddof tests
isVoid Jul 25, 2021
1e1c8cd
docfix
isVoid Jul 26, 2021
1288a56
fixed_point fix
isVoid Jul 26, 2021
3a9e589
docs
isVoid Jul 26, 2021
490539c
docfix
isVoid Jul 26, 2021
394f0f0
docfix
isVoid Jul 27, 2021
bc0920b
remove thrust::reduce
isVoid Jul 27, 2021
934f104
.
isVoid Jul 27, 2021
b215199
Update cpp/src/rolling/rolling_detail.cuh
isVoid Jul 29, 2021
a3af3e9
Update cpp/src/rolling/rolling_detail.cuh
isVoid Jul 29, 2021
ae33de0
Update cpp/src/rolling/rolling_detail.cuh
isVoid Jul 29, 2021
645a172
address review comments
isVoid Jul 29, 2021
d94b8db
Apply suggestions from code review
isVoid Jul 29, 2021
0fe4a87
Add nan tests
isVoid Jul 30, 2021
9ce41ae
Remove auto generated column test
isVoid Jul 30, 2021
b62a40d
Merge branch 'branch-21.10' of https://github.com/rapidsai/cudf into …
isVoid Aug 19, 2021
43df716
count==0 case maps to invalid output.
isVoid Aug 21, 2021
d62eb00
Apply review comments: div by zero result is valid element
isVoid Aug 25, 2021
0b78ab3
Update cpp/src/rolling/rolling_detail.cuh
isVoid Aug 25, 2021
e3f89df
ddof > count situation is valid but nan
isVoid Aug 30, 2021
a504017
Merge branch 'rolling_std' of github.com:isVoid/cudf into rolling_std
isVoid Aug 30, 2021
d3cedb1
make operator constant
isVoid Aug 31, 2021
75e8140
header cleanup
isVoid Sep 1, 2021
d5359ba
Update cpp/tests/rolling/rolling_test.cpp
isVoid Sep 1, 2021
a14d77e
style
isVoid Sep 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class rolling_aggregation : public virtual aggregation {

protected:
rolling_aggregation() {}
rolling_aggregation(aggregation::Kind a) : aggregation{a} {}
};

enum class udf_type : bool { CUDA, PTX };
Expand Down
14 changes: 10 additions & 4 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class m2_aggregation : public aggregation {
/**
* @brief Derived class for specifying a standard deviation/variance aggregation
*/
class std_var_aggregation : public aggregation {
class std_var_aggregation : public rolling_aggregation {
isVoid marked this conversation as resolved.
Show resolved Hide resolved
public:
size_type _ddof; ///< Delta degrees of freedom

Expand All @@ -328,7 +328,7 @@ class std_var_aggregation : public aggregation {
size_t do_hash() const override { return this->aggregation::do_hash() ^ hash_impl(); }

protected:
std_var_aggregation(aggregation::Kind k, size_type ddof) : aggregation(k), _ddof{ddof}
std_var_aggregation(aggregation::Kind k, size_type ddof) : rolling_aggregation(k), _ddof{ddof}
{
CUDF_EXPECTS(k == aggregation::STD or k == aggregation::VARIANCE,
"std_var_aggregation can accept only STD, VARIANCE");
Expand All @@ -342,7 +342,10 @@ class std_var_aggregation : public aggregation {
*/
class var_aggregation final : public std_var_aggregation {
public:
var_aggregation(size_type ddof) : std_var_aggregation{aggregation::VARIANCE, ddof} {}
var_aggregation(size_type ddof)
: aggregation(aggregation::VARIANCE), std_var_aggregation{aggregation::VARIANCE, ddof}
{
}

std::unique_ptr<aggregation> clone() const override
{
Expand All @@ -361,7 +364,10 @@ class var_aggregation final : public std_var_aggregation {
*/
class std_aggregation final : public std_var_aggregation {
public:
std_aggregation(size_type ddof) : std_var_aggregation{aggregation::STD, ddof} {}
std_aggregation(size_type ddof)
: aggregation(aggregation::STD), std_var_aggregation{aggregation::STD, ddof}
{
}

std::unique_ptr<aggregation> clone() const override
{
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ std::unique_ptr<Base> make_variance_aggregation(size_type ddof)
return std::make_unique<detail::var_aggregation>(ddof);
}
template std::unique_ptr<aggregation> make_variance_aggregation<aggregation>(size_type ddof);
template std::unique_ptr<rolling_aggregation> make_variance_aggregation<rolling_aggregation>(
size_type ddof);

/// Factory to create a STD aggregation
template <typename Base>
Expand All @@ -436,6 +438,8 @@ std::unique_ptr<Base> make_std_aggregation(size_type ddof)
return std::make_unique<detail::std_aggregation>(ddof);
}
template std::unique_ptr<aggregation> make_std_aggregation<aggregation>(size_type ddof);
template std::unique_ptr<rolling_aggregation> make_std_aggregation<rolling_aggregation>(
size_type ddof);

/// Factory to create a MEDIAN aggregation
template <typename Base>
Expand Down
121 changes: 117 additions & 4 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <cudf/detail/gather.hpp>
#include <cudf/detail/groupby/sort_helper.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/unary.hpp>
isVoid marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/detail/valid_if.cuh>
Expand All @@ -54,12 +55,14 @@

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/binary_search.h>
#include <thrust/detail/execution_policy.h>
#include <thrust/execution_policy.h>
#include <thrust/find.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/pair.h>
#include <thrust/transform.h>

#include <memory>
Expand Down Expand Up @@ -279,6 +282,86 @@ struct DeviceRollingCountAll {
}
};

/**
* @brief Operator for applying a VAR rolling aggregation on a single window.
*/
template <typename InputType>
struct DeviceRollingVariance {
size_type min_periods;
size_type ddof;
isVoid marked this conversation as resolved.
Show resolved Hide resolved

// what operations do we support
template <typename T = InputType, aggregation::Kind O = aggregation::VARIANCE>
static constexpr bool is_supported()
{
return is_fixed_width<InputType>() and not is_chrono<InputType>();
}

DeviceRollingVariance(size_type _min_periods, size_type _ddof)
: min_periods(_min_periods), ddof{_ddof}
{
}

template <typename OutputType, bool has_nulls>
bool __device__ operator()(column_device_view const& input,
column_device_view const&,
mutable_column_device_view& output,
size_type start_index,
size_type end_index,
size_type current_index)
isVoid marked this conversation as resolved.
Show resolved Hide resolved
{
using DeviceInputType = device_storage_type_t<InputType>;

cudf::size_type count{0}; // valid counts in the window

// Count valid observations in window
if (has_nulls) {
count = thrust::count_if(thrust::seq,
thrust::make_counting_iterator(start_index),
thrust::make_counting_iterator(end_index),
[&input](auto i) { return input.is_valid_nocheck(i); });
} else {
count = end_index - start_index;
}
isVoid marked this conversation as resolved.
Show resolved Hide resolved

// Variance/Std is non-negative, thus ddof should be strictly less than valid counts.
// Variance/Std of a lone value is statistically meaningless
bool output_is_valid = (count >= min_periods) and not(count == 1) and not(count <= ddof);

if (output_is_valid) {
// Welford algorithm, a numerically stable, single pass algorithm
// See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
OutputType m, m2;
size_type running_count;

thrust::tie(running_count, m, m2) =
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
thrust::reduce(thrust::seq,
thrust::make_counting_iterator(start_index),
thrust::make_counting_iterator(end_index),
thrust::make_tuple(size_type{0}, OutputType{0}, OutputType{0}),
[&](auto acc, auto i) {
if (has_nulls and input.is_null_nocheck(i)) { return acc; }

OutputType m_acc, m2_acc, tmp1, tmp2;
size_type r_count_acc;
thrust::tie(r_count_acc, m_acc, m2_acc) = acc;
OutputType x = static_cast<OutputType>(input.element<DeviceInputType>(i));

r_count_acc++;
tmp1 = x - m_acc;
m_acc += tmp1 / r_count_acc;
tmp2 = x - m_acc;
m2_acc += tmp1 * tmp2;
return thrust::make_tuple(r_count_acc, m_acc, m2_acc);
});

output.element<OutputType>(current_index) = m2 / (count - ddof);
}

return output_is_valid;
}
};

/**
* @brief Operator for applying a ROW_NUMBER rolling aggregation on a single window.
*/
Expand Down Expand Up @@ -506,6 +589,11 @@ struct corresponding_rolling_operator<InputType, aggregation::Kind::LEAD> {
using type = DeviceRollingLead<InputType>;
};

template <typename InputType>
struct corresponding_rolling_operator<InputType, aggregation::Kind::VARIANCE> {
using type = DeviceRollingVariance<InputType>;
};

template <typename InputType>
struct corresponding_rolling_operator<InputType, aggregation::Kind::LAG> {
using type = DeviceRollingLag<InputType>;
Expand All @@ -527,15 +615,24 @@ struct create_rolling_operator<
InputType,
op,
std::enable_if_t<corresponding_rolling_operator<InputType, op>::type::is_supported()>> {
template <
typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O != aggregation::Kind::LEAD && O != aggregation::Kind::LAG>* = nullptr>
template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O != aggregation::Kind::LEAD && O != aggregation::Kind::LAG &&
O != aggregation::Kind::VARIANCE>* = nullptr>
auto operator()(size_type min_periods, rolling_aggregation const&)
{
return typename corresponding_rolling_operator<InputType, op>::type(min_periods);
}

template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O == aggregation::Kind::VARIANCE>* = nullptr>
auto operator()(size_type min_periods, rolling_aggregation const& agg)
{
return DeviceRollingVariance<InputType>{
min_periods, dynamic_cast<cudf::detail::var_aggregation const&>(agg)._ddof};
}

template <typename T = InputType,
aggregation::Kind O = op,
std::enable_if_t<O == aggregation::Kind::LEAD>* = nullptr>
Expand Down Expand Up @@ -632,6 +729,16 @@ class rolling_aggregation_preprocessor final : public cudf::detail::simple_aggre
return {};
}

// STD aggregations depends on VARIANCE aggregation. Each element is applied
// with sqaured-root in the finalize() step.
std::vector<std::unique_ptr<aggregation>> visit(data_type,
cudf::detail::std_aggregation const& agg) override
{
std::vector<std::unique_ptr<aggregation>> aggs;
aggs.push_back(make_variance_aggregation(agg._ddof));
return aggs;
}

// LEAD and LAG have custom behaviors for non fixed-width types.
std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, cudf::detail::lead_lag_aggregation const& agg) override
Expand Down Expand Up @@ -750,6 +857,12 @@ class rolling_aggregation_postprocessor final : public cudf::detail::aggregation
lists_column_view(collected_list->view()), agg._nulls_equal, agg._nans_equal, stream, mr);
}

// perform the element-wise square root operation on result of VARIANCE
void visit(cudf::detail::std_aggregation const& agg) override
{
result = detail::unary_operation(intermediate->view(), unary_operator::SQRT, stream, mr);
}

std::unique_ptr<column> get_result()
{
CUDF_EXPECTS(result != nullptr,
Expand Down
Loading