Skip to content

Commit

Permalink
Merge pull request #13020 from rapidsai/branch-23.04
Browse files Browse the repository at this point in the history
Forward-merge branch-23.04 to branch-23.06
  • Loading branch information
GPUtester authored Mar 27, 2023
2 parents 7e7dc9b + 173fde9 commit c01c12b
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 27 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ add_library(
src/reductions/segmented/max.cu
src/reductions/segmented/mean.cu
src/reductions/segmented/min.cu
src/reductions/segmented/nunique.cu
src/reductions/segmented/product.cu
src/reductions/segmented/reductions.cpp
src/reductions/segmented/std.cu
Expand Down
2 changes: 1 addition & 1 deletion cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ ConfigureBench(
)
ConfigureNVBench(
REDUCTION_NVBENCH reduction/distinct_count.cpp reduction/rank.cpp reduction/scan_structs.cpp
reduction/segment_reduce.cu
reduction/segmented_reduce.cpp
)

# ##################################################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,31 @@

#include <cudf/aggregation.hpp>
#include <cudf/column/column.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/filling.hpp>
#include <cudf/reduction.hpp>
#include <cudf/scalar/scalar.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <thrust/device_vector.h>

#include <memory>
#include <type_traits>

bool constexpr is_boolean_output_agg(cudf::segmented_reduce_aggregation::Kind kind)
{
return kind == cudf::segmented_reduce_aggregation::ALL ||
kind == cudf::segmented_reduce_aggregation::ANY;
}

bool constexpr is_float_output_agg(cudf::segmented_reduce_aggregation::Kind kind)
{
return kind == cudf::segmented_reduce_aggregation::MEAN ||
kind == cudf::segmented_reduce_aggregation::VARIANCE ||
kind == cudf::segmented_reduce_aggregation::STD;
}

template <cudf::segmented_reduce_aggregation::Kind kind>
std::unique_ptr<cudf::segmented_reduce_aggregation> make_simple_aggregation()
std::unique_ptr<cudf::segmented_reduce_aggregation> make_reduce_aggregation()
{
switch (kind) {
case cudf::segmented_reduce_aggregation::SUM:
Expand All @@ -54,12 +59,22 @@ std::unique_ptr<cudf::segmented_reduce_aggregation> make_simple_aggregation()
return cudf::make_all_aggregation<cudf::segmented_reduce_aggregation>();
case cudf::segmented_reduce_aggregation::ANY:
return cudf::make_any_aggregation<cudf::segmented_reduce_aggregation>();
default: CUDF_FAIL("Unsupported simple segmented aggregation");
case cudf::segmented_reduce_aggregation::SUM_OF_SQUARES:
return cudf::make_sum_of_squares_aggregation<cudf::segmented_reduce_aggregation>();
case cudf::segmented_reduce_aggregation::MEAN:
return cudf::make_mean_aggregation<cudf::segmented_reduce_aggregation>();
case cudf::segmented_reduce_aggregation::VARIANCE:
return cudf::make_variance_aggregation<cudf::segmented_reduce_aggregation>();
case cudf::segmented_reduce_aggregation::STD:
return cudf::make_std_aggregation<cudf::segmented_reduce_aggregation>();
case cudf::segmented_reduce_aggregation::NUNIQUE:
return cudf::make_nunique_aggregation<cudf::segmented_reduce_aggregation>();
default: CUDF_FAIL("Unsupported segmented reduce aggregation in this benchmark");
}
}

template <typename DataType>
std::pair<std::unique_ptr<cudf::column>, thrust::device_vector<cudf::size_type>> make_test_data(
std::pair<std::unique_ptr<cudf::column>, std::unique_ptr<cudf::column>> make_test_data(
nvbench::state& state)
{
auto const column_size{cudf::size_type(state.get_int64("column_size"))};
Expand All @@ -72,28 +87,30 @@ std::pair<std::unique_ptr<cudf::column>, thrust::device_vector<cudf::size_type>>
dtype, distribution_id::UNIFORM, 0, 100);
auto input = create_random_column(dtype, row_count{column_size}, profile);

auto offset_it = cudf::detail::make_counting_transform_iterator(
0, [column_size, segment_length] __device__(auto i) {
return column_size < i * segment_length ? column_size : i * segment_length;
});

thrust::device_vector<cudf::size_type> d_offsets(offset_it, offset_it + num_segments + 1);

return std::pair(std::move(input), d_offsets);
auto offsets = cudf::sequence(num_segments + 1,
cudf::numeric_scalar<cudf::size_type>(0),
cudf::numeric_scalar<cudf::size_type>(segment_length));
return std::pair(std::move(input), std::move(offsets));
}

template <typename DataType, cudf::aggregation::Kind kind>
void BM_Simple_Segmented_Reduction(nvbench::state& state,
nvbench::type_list<DataType, nvbench::enum_type<kind>>)
void BM_Segmented_Reduction(nvbench::state& state,
nvbench::type_list<DataType, nvbench::enum_type<kind>>)
{
auto const column_size{cudf::size_type(state.get_int64("column_size"))};
auto const num_segments{cudf::size_type(state.get_int64("num_segments"))};

auto [input, offsets] = make_test_data<DataType>(state);
auto agg = make_simple_aggregation<kind>();
auto agg = make_reduce_aggregation<kind>();

auto output_type = is_boolean_output_agg(kind) ? cudf::data_type{cudf::type_id::BOOL8}
: cudf::data_type{cudf::type_to_id<DataType>()};
auto const output_type = [] {
if (is_boolean_output_agg(kind)) { return cudf::data_type{cudf::type_id::BOOL8}; }
if (is_float_output_agg(kind)) { return cudf::data_type{cudf::type_id::FLOAT64}; }
if (kind == cudf::segmented_reduce_aggregation::NUNIQUE) {
return cudf::data_type{cudf::type_to_id<cudf::size_type>()};
}
return cudf::data_type{cudf::type_to_id<DataType>()};
}();

state.add_element_count(column_size);
state.add_global_memory_reads<DataType>(column_size);
Expand All @@ -103,8 +120,10 @@ void BM_Simple_Segmented_Reduction(nvbench::state& state,
state.add_global_memory_writes<DataType>(num_segments);
}

auto const input_view = input->view();
auto const offset_span = cudf::device_span<cudf::size_type>{offsets};
auto const input_view = input->view();
auto const offsets_view = offsets->view();
auto const offset_span = cudf::device_span<cudf::size_type const>{
offsets_view.template data<cudf::size_type>(), static_cast<std::size_t>(offsets_view.size())};

state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(
Expand All @@ -115,13 +134,17 @@ void BM_Simple_Segmented_Reduction(nvbench::state& state,

using Types = nvbench::type_list<bool, int32_t, float, double>;
// Skip benchmarking MAX/ANY since they are covered by MIN/ALL respectively.
// Also VARIANCE includes STD calculation.
using AggKinds = nvbench::enum_type_list<cudf::aggregation::SUM,
cudf::aggregation::PRODUCT,
cudf::aggregation::MIN,
cudf::aggregation::ALL>;
cudf::aggregation::ALL,
cudf::aggregation::MEAN,
cudf::aggregation::VARIANCE,
cudf::aggregation::NUNIQUE>;

NVBENCH_BENCH_TYPES(BM_Simple_Segmented_Reduction, NVBENCH_TYPE_AXES(Types, AggKinds))
.set_name("segmented_reduction_simple")
NVBENCH_BENCH_TYPES(BM_Segmented_Reduction, NVBENCH_TYPE_AXES(Types, AggKinds))
.set_name("segmented_reduction")
.set_type_axes_names({"DataType", "AggregationKinds"})
.add_int64_axis("column_size", {100'000, 1'000'000, 10'000'000, 100'000'000})
.add_int64_axis("num_segments", {1'000, 10'000, 100'000});
4 changes: 3 additions & 1 deletion cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,9 @@ class argmin_aggregation final : public rolling_aggregation, public groupby_aggr
/**
* @brief Derived class for specifying a nunique aggregation
*/
class nunique_aggregation final : public groupby_aggregation, public reduce_aggregation {
class nunique_aggregation final : public groupby_aggregation,
public reduce_aggregation,
public segmented_reduce_aggregation {
public:
nunique_aggregation(null_policy null_handling)
: aggregation{NUNIQUE}, _null_handling{null_handling}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,34 @@ std::unique_ptr<column> segmented_variance(column_view const& col,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Counts the number of unique values within each segment of a column
*
* Unique entries are counted by comparing adjacent values so the column segments
* are expected to be sorted before calling this function otherwise the results
* are undefined.
*
* If any input segment is empty, that segment's result is null.
*
* If `null_handling==null_policy::INCLUDE`, the segment count is the number of
* unique values +1 which includes all the null entries in that segment.
* If `null_handling==null_policy::EXCLUDE`, the segment count does not include nulls.
*
* @throw cudf::logic_error if input column type is a nested type
*
* @param col Input column data
* @param offsets Indices to identify segment boundaries within input `col`
* @param null_handling Specifies how null elements are processed for each segment
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return Column of unique counts per segment
*/
std::unique_ptr<column> segmented_nunique(column_view const& col,
device_span<size_type const> offsets,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

} // namespace detail
} // namespace reduction
} // namespace cudf
2 changes: 2 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ template std::unique_ptr<groupby_aggregation> make_nunique_aggregation<groupby_a
null_policy null_handling);
template std::unique_ptr<reduce_aggregation> make_nunique_aggregation<reduce_aggregation>(
null_policy null_handling);
template std::unique_ptr<segmented_reduce_aggregation>
make_nunique_aggregation<segmented_reduce_aggregation>(null_policy null_handling);

/// Factory to create an NTH_ELEMENT aggregation
template <typename Base>
Expand Down
113 changes: 113 additions & 0 deletions cpp/src/reductions/segmented/nunique.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "update_validity.hpp"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/labeling/label_segments.cuh>
#include <cudf/reduction/detail/segmented_reduction.cuh>
#include <cudf/reduction/detail/segmented_reduction_functions.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

namespace cudf {
namespace reduction {
namespace detail {
namespace {
template <typename ComparatorType>
struct is_unique_fn {
column_device_view const d_col;
ComparatorType row_equal;
null_policy null_handling;
size_type const* offsets;
size_type const* labels;

__device__ size_type operator()(size_type idx) const
{
if (null_handling == null_policy::EXCLUDE && d_col.is_null(idx)) { return 0; }
return static_cast<size_type>(offsets[labels[idx]] == idx || (!row_equal(idx, idx - 1)));
}
};
} // namespace

std::unique_ptr<cudf::column> segmented_nunique(column_view const& col,
device_span<size_type const> offsets,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// only support non-nested types
CUDF_EXPECTS(!cudf::is_nested(col.type()),
"segmented reduce nunique only supports non-nested column types");

// compute the unique identifiers within each segment
auto const identifiers = [&] {
auto const d_col = column_device_view::create(col, stream);
auto const comparator =
cudf::experimental::row::equality::self_comparator{table_view({col}), stream};
auto const row_equal =
comparator.equal_to<false>(cudf::nullate::DYNAMIC{col.has_nulls()}, null_equality::EQUAL);

auto labels = rmm::device_uvector<size_type>(col.size(), stream);
cudf::detail::label_segments(
offsets.begin(), offsets.end(), labels.begin(), labels.end(), stream);
auto fn = is_unique_fn<decltype(row_equal)>{
*d_col, row_equal, null_handling, offsets.data(), labels.data()};

auto identifiers = rmm::device_uvector<size_type>(col.size(), stream);
thrust::transform(rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(col.size()),
identifiers.begin(),
fn);
return identifiers;
}();

auto result = cudf::make_numeric_column(data_type(type_to_id<size_type>()),
static_cast<size_type>(offsets.size() - 1),
cudf::mask_state::UNALLOCATED,
stream,
mr);

// Sum the unique identifiers within each segment
auto add_op = op::sum{};
cudf::reduction::detail::segmented_reduce(identifiers.begin(),
offsets.begin(),
offsets.end(),
result->mutable_view().data<size_type>(),
add_op.get_binary_op(),
0,
stream);

// Compute the output null mask
// - only empty segments are tagged as null
// - nulls are counted appropriately above per null_handling policy
auto const bitmask_col = null_handling == null_policy::EXCLUDE ? col : result->view();
cudf::reduction::detail::segmented_update_validity(
*result, bitmask_col, offsets, null_policy::EXCLUDE, std::nullopt, stream, mr);

return result;
}
} // namespace detail
} // namespace reduction
} // namespace cudf
2 changes: 2 additions & 0 deletions cpp/src/reductions/segmented/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ struct segmented_reduce_dispatch_functor {
return segmented_standard_deviation(
col, offsets, output_dtype, null_handling, var_agg._ddof, stream, mr);
}
case segmented_reduce_aggregation::NUNIQUE:
return segmented_nunique(col, offsets, null_handling, stream, mr);
default: CUDF_FAIL("Unsupported aggregation type.");
}
}
Expand Down
42 changes: 42 additions & 0 deletions cpp/tests/reductions/segmented_reduction_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,48 @@ TEST_F(SegmentedReductionTestUntyped, VarianceNulls)
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected);
}

TEST_F(SegmentedReductionTestUntyped, NUnique)
{
auto const input =
cudf::test::fixed_width_column_wrapper<int32_t>({10, 15, 20, 30, 60, 60, 70, 70, 80});
auto const offsets = std::vector<cudf::size_type>{0, 1, 1, 2, 4, 9};
auto const d_offsets = cudf::detail::make_device_uvector_async(
offsets, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto const agg = cudf::make_nunique_aggregation<cudf::segmented_reduce_aggregation>();
auto const output_type = cudf::data_type{cudf::type_id::INT32};

auto expected =
cudf::test::fixed_width_column_wrapper<cudf::size_type>{{1, 0, 1, 2, 3}, {1, 0, 1, 1, 1}};
auto result =
cudf::segmented_reduce(input, d_offsets, *agg, output_type, cudf::null_policy::EXCLUDE);
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected);

result = cudf::segmented_reduce(input, d_offsets, *agg, output_type, cudf::null_policy::INCLUDE);
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected);
}

TEST_F(SegmentedReductionTestUntyped, NUniqueNulls)
{
auto const input = cudf::test::fixed_width_column_wrapper<int32_t>(
{10, 0, 20, 30, 60, 60, 70, 70, 0}, {1, 0, 1, 1, 1, 1, 1, 1, 0});
auto const offsets = std::vector<cudf::size_type>{0, 1, 1, 2, 4, 9};
auto const d_offsets = cudf::detail::make_device_uvector_async(
offsets, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto const agg = cudf::make_nunique_aggregation<cudf::segmented_reduce_aggregation>();
auto const output_type = cudf::data_type{cudf::type_id::INT32};

auto expected =
cudf::test::fixed_width_column_wrapper<cudf::size_type>{{1, 0, 0, 2, 2}, {1, 0, 0, 1, 1}};
auto result =
cudf::segmented_reduce(input, d_offsets, *agg, output_type, cudf::null_policy::EXCLUDE);
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected);

expected =
cudf::test::fixed_width_column_wrapper<cudf::size_type>{{1, 0, 1, 2, 3}, {1, 0, 1, 1, 1}};
result = cudf::segmented_reduce(input, d_offsets, *agg, output_type, cudf::null_policy::INCLUDE);
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected);
}

TEST_F(SegmentedReductionTestUntyped, Errors)
{
auto const input = cudf::test::fixed_width_column_wrapper<int32_t>(
Expand Down

0 comments on commit c01c12b

Please sign in to comment.