From 173fde9d9ae335014a1aa6de60b417f6f245dcf2 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Mon, 27 Mar 2023 19:36:13 -0400 Subject: [PATCH] Add nunique aggregation support for cudf::segmented_reduce (#12972) Adds support for `NUNIQUE` aggregation type for `cudf::segmented_reduce`. This computes the number of unique elements within each segment specified. Due to the overhead of sorting, the segments must be sorted before calling this function otherwise the results are undefined. Also, only non-nested column types are supported as well. Reference #10432 Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Divye Gala (https://github.com/divyegala) - Karthikeyan (https://github.com/karthikeyann) - Bradley Dice (https://github.com/bdice) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/12972 --- cpp/CMakeLists.txt | 1 + cpp/benchmarks/CMakeLists.txt | 2 +- ...segment_reduce.cu => segmented_reduce.cpp} | 73 +++++++---- .../cudf/detail/aggregation/aggregation.hpp | 4 +- .../detail/segmented_reduction_functions.hpp | 28 +++++ cpp/src/aggregation/aggregation.cpp | 2 + cpp/src/reductions/segmented/nunique.cu | 113 ++++++++++++++++++ cpp/src/reductions/segmented/reductions.cpp | 2 + .../reductions/segmented_reduction_tests.cpp | 42 +++++++ 9 files changed, 240 insertions(+), 27 deletions(-) rename cpp/benchmarks/reduction/{segment_reduce.cu => segmented_reduce.cpp} (58%) create mode 100644 cpp/src/reductions/segmented/nunique.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 13583378134..127df03c54d 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -464,6 +464,7 @@ add_library( src/reductions/segmented/max.cu src/reductions/segmented/mean.cu src/reductions/segmented/min.cu + src/reductions/segmented/nunique.cu src/reductions/segmented/product.cu src/reductions/segmented/reductions.cpp src/reductions/segmented/std.cu diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index e01d7745e94..b9c15e244de 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -192,7 +192,7 @@ ConfigureBench( ) ConfigureNVBench( REDUCTION_NVBENCH reduction/distinct_count.cpp reduction/rank.cpp reduction/scan_structs.cpp - reduction/segment_reduce.cu + reduction/segmented_reduce.cpp ) # ################################################################################################## diff --git a/cpp/benchmarks/reduction/segment_reduce.cu b/cpp/benchmarks/reduction/segmented_reduce.cpp similarity index 58% rename from cpp/benchmarks/reduction/segment_reduce.cu rename to cpp/benchmarks/reduction/segmented_reduce.cpp index 127b3598dae..590a014ad76 100644 --- a/cpp/benchmarks/reduction/segment_reduce.cu +++ b/cpp/benchmarks/reduction/segmented_reduce.cpp @@ -20,17 +20,15 @@ #include #include -#include +#include #include +#include #include #include #include -#include - #include -#include bool constexpr is_boolean_output_agg(cudf::segmented_reduce_aggregation::Kind kind) { @@ -38,8 +36,15 @@ bool constexpr is_boolean_output_agg(cudf::segmented_reduce_aggregation::Kind ki kind == cudf::segmented_reduce_aggregation::ANY; } +bool constexpr is_float_output_agg(cudf::segmented_reduce_aggregation::Kind kind) +{ + return kind == cudf::segmented_reduce_aggregation::MEAN || + kind == cudf::segmented_reduce_aggregation::VARIANCE || + kind == cudf::segmented_reduce_aggregation::STD; +} + template -std::unique_ptr make_simple_aggregation() +std::unique_ptr make_reduce_aggregation() { switch (kind) { case cudf::segmented_reduce_aggregation::SUM: @@ -54,12 +59,22 @@ std::unique_ptr make_simple_aggregation() return cudf::make_all_aggregation(); case cudf::segmented_reduce_aggregation::ANY: return cudf::make_any_aggregation(); - default: CUDF_FAIL("Unsupported simple segmented aggregation"); + case cudf::segmented_reduce_aggregation::SUM_OF_SQUARES: + return cudf::make_sum_of_squares_aggregation(); + case cudf::segmented_reduce_aggregation::MEAN: + return cudf::make_mean_aggregation(); + case cudf::segmented_reduce_aggregation::VARIANCE: + return cudf::make_variance_aggregation(); + case cudf::segmented_reduce_aggregation::STD: + return cudf::make_std_aggregation(); + case cudf::segmented_reduce_aggregation::NUNIQUE: + return cudf::make_nunique_aggregation(); + default: CUDF_FAIL("Unsupported segmented reduce aggregation in this benchmark"); } } template -std::pair, thrust::device_vector> make_test_data( +std::pair, std::unique_ptr> make_test_data( nvbench::state& state) { auto const column_size{cudf::size_type(state.get_int64("column_size"))}; @@ -72,28 +87,30 @@ std::pair, thrust::device_vector> dtype, distribution_id::UNIFORM, 0, 100); auto input = create_random_column(dtype, row_count{column_size}, profile); - auto offset_it = cudf::detail::make_counting_transform_iterator( - 0, [column_size, segment_length] __device__(auto i) { - return column_size < i * segment_length ? column_size : i * segment_length; - }); - - thrust::device_vector d_offsets(offset_it, offset_it + num_segments + 1); - - return std::pair(std::move(input), d_offsets); + auto offsets = cudf::sequence(num_segments + 1, + cudf::numeric_scalar(0), + cudf::numeric_scalar(segment_length)); + return std::pair(std::move(input), std::move(offsets)); } template -void BM_Simple_Segmented_Reduction(nvbench::state& state, - nvbench::type_list>) +void BM_Segmented_Reduction(nvbench::state& state, + nvbench::type_list>) { auto const column_size{cudf::size_type(state.get_int64("column_size"))}; auto const num_segments{cudf::size_type(state.get_int64("num_segments"))}; auto [input, offsets] = make_test_data(state); - auto agg = make_simple_aggregation(); + auto agg = make_reduce_aggregation(); - auto output_type = is_boolean_output_agg(kind) ? cudf::data_type{cudf::type_id::BOOL8} - : cudf::data_type{cudf::type_to_id()}; + auto const output_type = [] { + if (is_boolean_output_agg(kind)) { return cudf::data_type{cudf::type_id::BOOL8}; } + if (is_float_output_agg(kind)) { return cudf::data_type{cudf::type_id::FLOAT64}; } + if (kind == cudf::segmented_reduce_aggregation::NUNIQUE) { + return cudf::data_type{cudf::type_to_id()}; + } + return cudf::data_type{cudf::type_to_id()}; + }(); state.add_element_count(column_size); state.add_global_memory_reads(column_size); @@ -103,8 +120,10 @@ void BM_Simple_Segmented_Reduction(nvbench::state& state, state.add_global_memory_writes(num_segments); } - auto const input_view = input->view(); - auto const offset_span = cudf::device_span{offsets}; + auto const input_view = input->view(); + auto const offsets_view = offsets->view(); + auto const offset_span = cudf::device_span{ + offsets_view.template data(), static_cast(offsets_view.size())}; state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value())); state.exec( @@ -115,13 +134,17 @@ void BM_Simple_Segmented_Reduction(nvbench::state& state, using Types = nvbench::type_list; // Skip benchmarking MAX/ANY since they are covered by MIN/ALL respectively. +// Also VARIANCE includes STD calculation. using AggKinds = nvbench::enum_type_list; + cudf::aggregation::ALL, + cudf::aggregation::MEAN, + cudf::aggregation::VARIANCE, + cudf::aggregation::NUNIQUE>; -NVBENCH_BENCH_TYPES(BM_Simple_Segmented_Reduction, NVBENCH_TYPE_AXES(Types, AggKinds)) - .set_name("segmented_reduction_simple") +NVBENCH_BENCH_TYPES(BM_Segmented_Reduction, NVBENCH_TYPE_AXES(Types, AggKinds)) + .set_name("segmented_reduction") .set_type_axes_names({"DataType", "AggregationKinds"}) .add_int64_axis("column_size", {100'000, 1'000'000, 10'000'000, 100'000'000}) .add_int64_axis("num_segments", {1'000, 10'000, 100'000}); diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index e269d4d2e13..b688bf3d445 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -535,7 +535,9 @@ class argmin_aggregation final : public rolling_aggregation, public groupby_aggr /** * @brief Derived class for specifying a nunique aggregation */ -class nunique_aggregation final : public groupby_aggregation, public reduce_aggregation { +class nunique_aggregation final : public groupby_aggregation, + public reduce_aggregation, + public segmented_reduce_aggregation { public: nunique_aggregation(null_policy null_handling) : aggregation{NUNIQUE}, _null_handling{null_handling} diff --git a/cpp/include/cudf/reduction/detail/segmented_reduction_functions.hpp b/cpp/include/cudf/reduction/detail/segmented_reduction_functions.hpp index c1bf59e5f65..3902a7200a9 100644 --- a/cpp/include/cudf/reduction/detail/segmented_reduction_functions.hpp +++ b/cpp/include/cudf/reduction/detail/segmented_reduction_functions.hpp @@ -325,6 +325,34 @@ std::unique_ptr segmented_variance(column_view const& col, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); +/** + * @brief Counts the number of unique values within each segment of a column + * + * Unique entries are counted by comparing adjacent values so the column segments + * are expected to be sorted before calling this function otherwise the results + * are undefined. + * + * If any input segment is empty, that segment's result is null. + * + * If `null_handling==null_policy::INCLUDE`, the segment count is the number of + * unique values +1 which includes all the null entries in that segment. + * If `null_handling==null_policy::EXCLUDE`, the segment count does not include nulls. + * + * @throw cudf::logic_error if input column type is a nested type + * + * @param col Input column data + * @param offsets Indices to identify segment boundaries within input `col` + * @param null_handling Specifies how null elements are processed for each segment + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * @return Column of unique counts per segment + */ +std::unique_ptr segmented_nunique(column_view const& col, + device_span offsets, + null_policy null_handling, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + } // namespace detail } // namespace reduction } // namespace cudf diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index 07c53b3a421..2e6a643484e 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -605,6 +605,8 @@ template std::unique_ptr make_nunique_aggregation make_nunique_aggregation( null_policy null_handling); +template std::unique_ptr +make_nunique_aggregation(null_policy null_handling); /// Factory to create an NTH_ELEMENT aggregation template diff --git a/cpp/src/reductions/segmented/nunique.cu b/cpp/src/reductions/segmented/nunique.cu new file mode 100644 index 00000000000..bd1efb41df8 --- /dev/null +++ b/cpp/src/reductions/segmented/nunique.cu @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "update_validity.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace cudf { +namespace reduction { +namespace detail { +namespace { +template +struct is_unique_fn { + column_device_view const d_col; + ComparatorType row_equal; + null_policy null_handling; + size_type const* offsets; + size_type const* labels; + + __device__ size_type operator()(size_type idx) const + { + if (null_handling == null_policy::EXCLUDE && d_col.is_null(idx)) { return 0; } + return static_cast(offsets[labels[idx]] == idx || (!row_equal(idx, idx - 1))); + } +}; +} // namespace + +std::unique_ptr segmented_nunique(column_view const& col, + device_span offsets, + null_policy null_handling, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + // only support non-nested types + CUDF_EXPECTS(!cudf::is_nested(col.type()), + "segmented reduce nunique only supports non-nested column types"); + + // compute the unique identifiers within each segment + auto const identifiers = [&] { + auto const d_col = column_device_view::create(col, stream); + auto const comparator = + cudf::experimental::row::equality::self_comparator{table_view({col}), stream}; + auto const row_equal = + comparator.equal_to(cudf::nullate::DYNAMIC{col.has_nulls()}, null_equality::EQUAL); + + auto labels = rmm::device_uvector(col.size(), stream); + cudf::detail::label_segments( + offsets.begin(), offsets.end(), labels.begin(), labels.end(), stream); + auto fn = is_unique_fn{ + *d_col, row_equal, null_handling, offsets.data(), labels.data()}; + + auto identifiers = rmm::device_uvector(col.size(), stream); + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(col.size()), + identifiers.begin(), + fn); + return identifiers; + }(); + + auto result = cudf::make_numeric_column(data_type(type_to_id()), + static_cast(offsets.size() - 1), + cudf::mask_state::UNALLOCATED, + stream, + mr); + + // Sum the unique identifiers within each segment + auto add_op = op::sum{}; + cudf::reduction::detail::segmented_reduce(identifiers.begin(), + offsets.begin(), + offsets.end(), + result->mutable_view().data(), + add_op.get_binary_op(), + 0, + stream); + + // Compute the output null mask + // - only empty segments are tagged as null + // - nulls are counted appropriately above per null_handling policy + auto const bitmask_col = null_handling == null_policy::EXCLUDE ? col : result->view(); + cudf::reduction::detail::segmented_update_validity( + *result, bitmask_col, offsets, null_policy::EXCLUDE, std::nullopt, stream, mr); + + return result; +} +} // namespace detail +} // namespace reduction +} // namespace cudf diff --git a/cpp/src/reductions/segmented/reductions.cpp b/cpp/src/reductions/segmented/reductions.cpp index 66b98fa8322..cee82560794 100644 --- a/cpp/src/reductions/segmented/reductions.cpp +++ b/cpp/src/reductions/segmented/reductions.cpp @@ -95,6 +95,8 @@ struct segmented_reduce_dispatch_functor { return segmented_standard_deviation( col, offsets, output_dtype, null_handling, var_agg._ddof, stream, mr); } + case segmented_reduce_aggregation::NUNIQUE: + return segmented_nunique(col, offsets, null_handling, stream, mr); default: CUDF_FAIL("Unsupported aggregation type."); } } diff --git a/cpp/tests/reductions/segmented_reduction_tests.cpp b/cpp/tests/reductions/segmented_reduction_tests.cpp index 40b0d268580..77fdad09c0b 100644 --- a/cpp/tests/reductions/segmented_reduction_tests.cpp +++ b/cpp/tests/reductions/segmented_reduction_tests.cpp @@ -927,6 +927,48 @@ TEST_F(SegmentedReductionTestUntyped, VarianceNulls) CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected); } +TEST_F(SegmentedReductionTestUntyped, NUnique) +{ + auto const input = + cudf::test::fixed_width_column_wrapper({10, 15, 20, 30, 60, 60, 70, 70, 80}); + auto const offsets = std::vector{0, 1, 1, 2, 4, 9}; + auto const d_offsets = cudf::detail::make_device_uvector_async( + offsets, cudf::get_default_stream(), rmm::mr::get_current_device_resource()); + auto const agg = cudf::make_nunique_aggregation(); + auto const output_type = cudf::data_type{cudf::type_id::INT32}; + + auto expected = + cudf::test::fixed_width_column_wrapper{{1, 0, 1, 2, 3}, {1, 0, 1, 1, 1}}; + auto result = + cudf::segmented_reduce(input, d_offsets, *agg, output_type, cudf::null_policy::EXCLUDE); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected); + + result = cudf::segmented_reduce(input, d_offsets, *agg, output_type, cudf::null_policy::INCLUDE); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected); +} + +TEST_F(SegmentedReductionTestUntyped, NUniqueNulls) +{ + auto const input = cudf::test::fixed_width_column_wrapper( + {10, 0, 20, 30, 60, 60, 70, 70, 0}, {1, 0, 1, 1, 1, 1, 1, 1, 0}); + auto const offsets = std::vector{0, 1, 1, 2, 4, 9}; + auto const d_offsets = cudf::detail::make_device_uvector_async( + offsets, cudf::get_default_stream(), rmm::mr::get_current_device_resource()); + auto const agg = cudf::make_nunique_aggregation(); + auto const output_type = cudf::data_type{cudf::type_id::INT32}; + + auto expected = + cudf::test::fixed_width_column_wrapper{{1, 0, 0, 2, 2}, {1, 0, 0, 1, 1}}; + auto result = + cudf::segmented_reduce(input, d_offsets, *agg, output_type, cudf::null_policy::EXCLUDE); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected); + + expected = + cudf::test::fixed_width_column_wrapper{{1, 0, 1, 2, 3}, {1, 0, 1, 1, 1}}; + result = cudf::segmented_reduce(input, d_offsets, *agg, output_type, cudf::null_policy::INCLUDE); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected); +} + TEST_F(SegmentedReductionTestUntyped, Errors) { auto const input = cudf::test::fixed_width_column_wrapper(