Skip to content

Commit

Permalink
Add compound aggregations to cudf::segmented_reduce (#12573)
Browse files Browse the repository at this point in the history
Adds mean, variance, and standard deviation aggregation support to `cudf::segmented_reduce`. These are compound (multi-step) aggregations and are modeled after the same aggregations supported but `cudf::reduce`. Once this approved and merged, the visitor pattern for this approach will be reworked for both `cudf::reduce` and `cudf::segmented_reduce` as per [#10432](#10432 (comment)).

The source tree for `src/reductions` has been adjusted to put all segmented-reduce source files into `src/reductions/segmented` and removing the `segmented_` prefix from those file names.
Also, the segmented-reduce functions have been moved from `cudf/detail/reduction_functions.hpp` into their own `cudf/detail/segmented_reduction_functions.hpp`. Likewise, the segmented-reduce CUB calls have been moved from `cudf/detail/reduction.cuh` to the new `cudf/detail/segmented_reduction.cuh` to help minimize including CUB headers.

Additionally, the sum-of-squares aggregation is also included since it was a simple reduction only requiring the appropriate aggregation class registration and source file.

Finally, gtests are added for these new types. The compound types only support floating-point outputs.

Follow on PRs will address the visitor pattern already mentioned above as well as additional data types. Discussion on additional aggregations will occur in the reference issue #10432.

Authors:
  - David Wendt (https://github.com/davidwendt)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - AJ Schmidt (https://github.com/ajschmidt8)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Bradley Dice (https://github.com/bdice)

URL: #12573
  • Loading branch information
davidwendt authored Feb 3, 2023
1 parent 21ef256 commit 182ee2c
Show file tree
Hide file tree
Showing 29 changed files with 1,318 additions and 365 deletions.
1 change: 1 addition & 0 deletions conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ outputs:
- test -f $PREFIX/include/cudf/detail/scan.hpp
- test -f $PREFIX/include/cudf/detail/scatter.hpp
- test -f $PREFIX/include/cudf/detail/search.hpp
- test -f $PREFIX/include/cudf/detail/segmented_reduction_functions.hpp
- test -f $PREFIX/include/cudf/detail/sequence.hpp
- test -f $PREFIX/include/cudf/detail/sorting.hpp
- test -f $PREFIX/include/cudf/detail/stream_compaction.hpp
Expand Down
19 changes: 12 additions & 7 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,18 @@ add_library(
src/reductions/scan/scan.cpp
src/reductions/scan/scan_exclusive.cu
src/reductions/scan/scan_inclusive.cu
src/reductions/segmented_all.cu
src/reductions/segmented_any.cu
src/reductions/segmented_max.cu
src/reductions/segmented_min.cu
src/reductions/segmented_product.cu
src/reductions/segmented_reductions.cpp
src/reductions/segmented_sum.cu
src/reductions/segmented/all.cu
src/reductions/segmented/any.cu
src/reductions/segmented/max.cu
src/reductions/segmented/mean.cu
src/reductions/segmented/min.cu
src/reductions/segmented/product.cu
src/reductions/segmented/reductions.cpp
src/reductions/segmented/std.cu
src/reductions/segmented/sum.cu
src/reductions/segmented/sum_of_squares.cu
src/reductions/segmented/update_validity.cu
src/reductions/segmented/var.cu
src/reductions/std.cu
src/reductions/sum.cu
src/reductions/sum_of_squares.cu
Expand Down
10 changes: 7 additions & 3 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ class all_aggregation final : public reduce_aggregation, public segmented_reduce
/**
* @brief Derived class for specifying a sum_of_squares aggregation
*/
class sum_of_squares_aggregation final : public groupby_aggregation, public reduce_aggregation {
class sum_of_squares_aggregation final : public groupby_aggregation,
public reduce_aggregation,
public segmented_reduce_aggregation {
public:
sum_of_squares_aggregation() : aggregation(SUM_OF_SQUARES) {}

Expand All @@ -313,7 +315,8 @@ class sum_of_squares_aggregation final : public groupby_aggregation, public redu
*/
class mean_aggregation final : public rolling_aggregation,
public groupby_aggregation,
public reduce_aggregation {
public reduce_aggregation,
public segmented_reduce_aggregation {
public:
mean_aggregation() : aggregation(MEAN) {}

Expand Down Expand Up @@ -353,7 +356,8 @@ class m2_aggregation : public groupby_aggregation {
*/
class std_var_aggregation : public rolling_aggregation,
public groupby_aggregation,
public reduce_aggregation {
public reduce_aggregation,
public segmented_reduce_aggregation {
public:
size_type _ddof; ///< Delta degrees of freedom

Expand Down
91 changes: 2 additions & 89 deletions cpp/include/cudf/detail/reduction.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

#pragma once

#include "reduction_operators.cuh"
#include <cudf/detail/reduction_operators.cuh>

#include <cudf/column/column_factories.hpp>
#include <cudf/utilities/type_dispatcher.hpp>
Expand All @@ -27,7 +27,6 @@
#include <rmm/exec_policy.hpp>

#include <cub/device/device_reduce.cuh>
#include <cub/device/device_segmented_reduce.cuh>

#include <thrust/for_each.h>
#include <thrust/iterator/iterator_traits.h>
Expand Down Expand Up @@ -229,92 +228,6 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
return std::unique_ptr<scalar>(result);
}

/**
* @brief Compute the specified simple reduction over each of the segments in the
* input range of elements.
*
* @tparam InputIterator the input column iterator
* @tparam OffsetIterator the offset column iterator
* @tparam OutputIterator the output column iterator
* @tparam BinaryOp the device binary operator used to reduce
* @tparam OutputType the output type of reduction
*
* @param[in] d_in the begin iterator to input
* @param[in] d_offset_begin the begin iterator to offset
* @param[in] d_offset_end the end iterator to offset. Note: This is
* num_segments+1 elements past `d_offset_begin`.
* @param[out] d_out the begin iterator to output
* @param[in] binary_op the reduction operator
* @param[in] identity the identity element of the reduction operator
* @param[in] initial_value Initial value of the reduction
* @param[in] stream CUDA stream used for device memory operations and kernel launches
*
*/
template <typename InputIterator,
typename OffsetIterator,
typename OutputIterator,
typename BinaryOp,
typename OutputType = typename thrust::iterator_value<OutputIterator>::type,
typename std::enable_if_t<is_fixed_width<OutputType>() &&
!cudf::is_fixed_point<OutputType>()>* = nullptr>
void segmented_reduce(InputIterator d_in,
OffsetIterator d_offset_begin,
OffsetIterator d_offset_end,
OutputIterator d_out,
BinaryOp binary_op,
OutputType initial_value,
rmm::cuda_stream_view stream)
{
auto const num_segments = static_cast<size_type>(std::distance(d_offset_begin, d_offset_end)) - 1;

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
size_t temp_storage_bytes = 0;
cub::DeviceSegmentedReduce::Reduce(d_temp_storage.data(),
temp_storage_bytes,
d_in,
d_out,
num_segments,
d_offset_begin,
d_offset_begin + 1,
binary_op,
initial_value,
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

// Run reduction
cub::DeviceSegmentedReduce::Reduce(d_temp_storage.data(),
temp_storage_bytes,
d_in,
d_out,
num_segments,
d_offset_begin,
d_offset_begin + 1,
binary_op,
initial_value,
stream.value());
}

template <typename InputIterator,
typename OffsetIterator,
typename OutputIterator,
typename BinaryOp,
typename OutputType = typename thrust::iterator_value<OutputIterator>::type,
typename std::enable_if_t<!(is_fixed_width<OutputType>() &&
!cudf::is_fixed_point<OutputType>())>* = nullptr>
void segmented_reduce(InputIterator,
OffsetIterator,
OffsetIterator,
OutputIterator,
BinaryOp,
OutputType,
rmm::cuda_stream_view)
{
CUDF_FAIL(
"Unsupported data types called on segmented_reduce. Only numeric and chrono types are "
"supported.");
}

} // namespace detail
} // namespace reduction
} // namespace cudf
172 changes: 3 additions & 169 deletions cpp/include/cudf/detail/reduction_functions.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -216,7 +216,7 @@ std::unique_ptr<scalar> mean(
std::unique_ptr<scalar> variance(
column_view const& col,
data_type const output_dtype,
cudf::size_type ddof,
size_type ddof,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand All @@ -239,7 +239,7 @@ std::unique_ptr<scalar> variance(
std::unique_ptr<scalar> standard_deviation(
column_view const& col,
data_type const output_dtype,
cudf::size_type ddof,
size_type ddof,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand Down Expand Up @@ -338,171 +338,5 @@ std::unique_ptr<scalar> merge_sets(
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Compute sum of each segment in input column.
*
* If an input segment is empty, the segment result is null.
*
* @throw cudf::logic_error if input column type is not convertible to `output_dtype`.
* @throw cudf::logic_error if `output_dtype` is not an arithmetic type.
*
* @param col Input column to compute sum
* @param offsets Indices to identify segment boundaries
* @param output_dtype Data type of return type and typecast elements of input column
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment must be valid for the
* reduced value to be valid. If `null_policy::EXCLUDE`, the reduced value is valid if any element
* in the segment is valid.
* @param init Initial value of each sum
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return Sums of segments in type `output_dtype`
*/
std::unique_ptr<column> segmented_sum(
column_view const& col,
device_span<size_type const> offsets,
data_type const output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Computes product of each segment in input column.
*
* If an input segment is empty, the segment result is null.
*
* @throw cudf::logic_error if input column type is not convertible to `output_dtype`.
* @throw cudf::logic_error if `output_dtype` is not an arithmetic type.
*
* @param col Input column to compute product
* @param offsets Indices to identify segment boundaries
* @param output_dtype data type of return type and typecast elements of input column
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment must be valid for the
* reduced value to be valid. If `null_policy::EXCLUDE`, the reduced value is valid if any element
* in the segment is valid.
* @param init Initial value of each product
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return Product as scalar of type `output_dtype`
*/
std::unique_ptr<column> segmented_product(
column_view const& col,
device_span<size_type const> offsets,
data_type const output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Compute minimum of each segment in input column.
*
* If an input segment is empty, the segment result is null.
*
* @throw cudf::logic_error if input column type is convertible to `output_dtype`.
*
* @param col Input column to compute minimum
* @param offsets Indices to identify segment boundaries
* @param output_dtype Data type of return type and typecast elements of input column
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment must be valid for the
* reduced value to be valid. If `null_policy::EXCLUDE`, the reduced value is valid if any element
* in the segment is valid.
* @param init Initial value of each minimum
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return Minimums of segments in type `output_dtype`
*/
std::unique_ptr<column> segmented_min(
column_view const& col,
device_span<size_type const> offsets,
data_type const output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Compute maximum of each segment in input column.
*
* If an input segment is empty, the segment result is null.
*
* @throw cudf::logic_error if input column type is convertible to `output_dtype`.
*
* @param col Input column to compute maximum
* @param offsets Indices to identify segment boundaries
* @param output_dtype Data type of return type and typecast elements of input column
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment must be valid for the
* reduced value to be valid. If `null_policy::EXCLUDE`, the reduced value is valid if any element
* in the segment is valid.
* @param init Initial value of each maximum
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return Maximums of segments in type `output_dtype`
*/
std::unique_ptr<column> segmented_max(
column_view const& col,
device_span<size_type const> offsets,
data_type const output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Compute if any of the values in the segment are true when typecasted to bool.
*
* If an input segment is empty, the segment result is null.
*
* @throw cudf::logic_error if input column type is not convertible to bool.
* @throw cudf::logic_error if `output_dtype` is not bool8.
*
* @param col Input column to compute any
* @param offsets Indices to identify segment boundaries
* @param output_dtype Data type of return type and typecast elements of input column
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment must be valid for the
* reduced value to be valid. If `null_policy::EXCLUDE`, the reduced value is valid if any element
* in the segment is valid.
* @param init Initial value of each any
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return Column of bool8 for the results of the segments
*/
std::unique_ptr<column> segmented_any(
column_view const& col,
device_span<size_type const> offsets,
data_type const output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Compute if all of the values in the segment are true when typecasted to bool.
*
* If an input segment is empty, the segment result is null.
*
* @throw cudf::logic_error if input column type is not convertible to bool.
* @throw cudf::logic_error if `output_dtype` is not bool8.
*
* @param col Input column to compute all
* @param offsets Indices to identify segment boundaries
* @param output_dtype Data type of return type and typecast elements of input column
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment must be valid for the
* reduced value to be valid. If `null_policy::EXCLUDE`, the reduced value is valid if any element
* in the segment is valid.
* @param init Initial value of each all
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return Column of bool8 for the results of the segments
*/
std::unique_ptr<column> segmented_all(
column_view const& col,
device_span<size_type const> offsets,
data_type const output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace reduction
} // namespace cudf
Loading

0 comments on commit 182ee2c

Please sign in to comment.