diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 56cd626a1ad..eacfba8a029 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -624,11 +624,11 @@ add_library(cudf::cudf ALIAS cudf) add_library( cudftestutil STATIC - tests/io/metadata_utilities.cpp + tests/io/metadata_utilities.cpp tests/quantiles/tdigest_utilities.cu - tests/utilities/base_fixture.cpp + tests/utilities/base_fixture.cpp tests/utilities/column_utilities.cu - tests/utilities/table_utilities.cu + tests/utilities/table_utilities.cu tests/strings/utilities.cpp ) diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp index 6dc41310b36..54454154086 100644 --- a/cpp/include/cudf/detail/tdigest/tdigest.hpp +++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,95 @@ namespace detail { namespace tdigest { /** - * @brief Create a tdigest column from it's constituent components. + * @brief Generate a tdigest column from a grouped set of numeric input values. + * + * The tdigest column produced is of the following structure: + * + * struct { + * // centroids for the digest + * list { + * struct { + * double // mean + * double // weight + * }, + * ... + * } + * // these are from the input stream, not the centroids. they are used + * // during the percentile_approx computation near the beginning or + * // end of the quantiles + * double // min + * double // max + * } + * + * Each output row is a single tdigest. The length of the row is the "size" of the + * tdigest, each element of which represents a weighted centroid (mean, weight). + * + * @param values Grouped (and sorted) values to merge. + * @param group_offsets Offsets of groups' starting points within @p values. + * @param group_labels 0-based ID of group that the corresponding value belongs to + * @param group_valid_counts Per-group counts of valid elements. + * @param num_groups Number of groups. + * @param max_centroids Parameter controlling the level of compression of the tdigest. Higher + * values result in a larger, more precise tdigest. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @returns tdigest column, with 1 tdigest per row + */ +std::unique_ptr group_tdigest(column_view const& values, + cudf::device_span group_offsets, + cudf::device_span group_labels, + cudf::device_span group_valid_counts, + size_type num_groups, + int max_centroids, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + +/** + * @brief Merges tdigests within the same group to generate a new tdigest. + * + * The tdigest column produced is of the following structure: + * + * struct { + * // centroids for the digest + * list { + * struct { + * double // mean + * double // weight + * }, + * ... + * } + * // these are from the input stream, not the centroids. they are used + * // during the percentile_approx computation near the beginning or + * // end of the quantiles + * double // min + * double // max + * } + * + * Each output row is a single tdigest. The length of the row is the "size" of the + * tdigest, each element of which represents a weighted centroid (mean, weight). + * + * @param values Grouped tdigests to merge. + * @param group_offsets Offsets of groups' starting points within @p values. + * @param group_labels 0-based ID of group that the corresponding value belongs to + * @param num_groups Number of groups. + * @param max_centroids Parameter controlling the level of compression of the tdigest. Higher + * values result in a larger, more precise tdigest. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @returns tdigest column, with 1 tdigest per row + */ +std::unique_ptr group_merge_tdigest(column_view const& values, + cudf::device_span group_offsets, + cudf::device_span group_labels, + size_type num_groups, + int max_centroids, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + +/** + * @brief Create a tdigest column from its constituent components. * * @param num_rows The number of rows in the output column. * @param centroid_means The inner means column. These values are partitioned into lists by the @@ -172,7 +260,7 @@ std::unique_ptr group_merge_tdigest(column_view const& values, rmm::mr::device_memory_resource* mr); /** - * @brief Generate a tdigest column from a set of numeric input values. + * @brief Generate a tdigest scalar from a set of numeric input values. * * The tdigest scalar produced is of the following structure: ** struct { @@ -196,7 +284,7 @@ std::unique_ptr group_merge_tdigest(column_view const& values, * @param max_centroids Parameter controlling the level of compression of the tdigest. Higher * values result in a larger, more precise tdigest. * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned column's device memory + * @param mr Device memory resource used to allocate the returned scalar's device memory * * @returns tdigest scalar */ @@ -205,6 +293,40 @@ std::unique_ptr reduce_tdigest(column_view const& values, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); +/** + * @brief Merges multiple tdigest columns to generate a new tdigest scalar. + * + * The tdigest scalar produced is of the following structure: + * + * struct { + * // centroids for the digest + * list { + * struct { + * double // mean + * double // weight + * }, + * ... + * } + * // these are from the input stream, not the centroids. they are used + * // during the percentile_approx computation near the beginning or + * // end of the quantiles + * double // min + * double // max + * } + * + * @param values tdigests to merge. + * @param max_centroids Parameter controlling the level of compression of the tdigest. Higher + * values result in a larger, more precise tdigest. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned scalar's device memory + * + * @returns tdigest column, with 1 tdigest per row + */ +std::unique_ptr reduce_merge_tdigest(column_view const& input, + int max_centroids, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); + } // namespace tdigest } // namespace detail } // namespace cudf \ No newline at end of file diff --git a/cpp/include/cudf_test/tdigest_utilities.cuh b/cpp/include/cudf_test/tdigest_utilities.cuh index f5cd8982eab..84e3feb82ed 100644 --- a/cpp/include/cudf_test/tdigest_utilities.cuh +++ b/cpp/include/cudf_test/tdigest_utilities.cuh @@ -52,31 +52,9 @@ struct column_max { } }; -struct tdigest_gen_grouped { - template < - typename T, - typename std::enable_if_t() || cudf::is_fixed_point()>* = nullptr> - std::unique_ptr operator()(column_view const& keys, column_view const& values, int delta) - { - cudf::table_view t({keys}); - cudf::groupby::groupby gb(t); - std::vector requests; - std::vector> aggregations; - aggregations.push_back(cudf::make_tdigest_aggregation(delta)); - requests.push_back({values, std::move(aggregations)}); - auto result = gb.aggregate(requests); - return std::move(result.second[0].results[0]); - } - - template < - typename T, - typename std::enable_if_t() && !cudf::is_fixed_point()>* = nullptr> - std::unique_ptr operator()(column_view const& keys, column_view const& values, int delta) - { - CUDF_FAIL("Invalid tdigest test type"); - } -}; - +/** + * @brief Functor to generate a tdigest. + */ struct tdigest_gen { template < typename T, @@ -97,9 +75,15 @@ struct tdigest_gen { } }; +/** + * @brief Compare a tdigest column against a sampling of expected values. + */ void tdigest_sample_compare(cudf::tdigest::tdigest_column_view const& tdv, std::vector const& h_expected); +/** + * @brief Compare the min/max values of a tdigest against inputs. + */ template void tdigest_minmax_compare(cudf::tdigest::tdigest_column_view const& tdv, column_view const& input_values) @@ -136,11 +120,14 @@ struct expected_tdigest { double min, max; }; +/** + * @brief Create an expected tdigest column given component inputs. + */ std::unique_ptr make_expected_tdigest_column(std::vector const& groups); -// shared tests for groupby/reduction. +// shared test for groupby/reduction. template -void simple_tdigest_aggregation(Func op) +void tdigest_simple_aggregation(Func op) { // create a tdigest that has far fewer values in it than the delta value. this should result // in every value remaining uncompressed @@ -162,8 +149,9 @@ void simple_tdigest_aggregation(Func op) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } +// shared test for groupby/reduction. template -void simple_with_null_tdigest_aggregation(Func op) +void tdigest_simple_with_nulls_aggregation(Func op) { // create a tdigest that has far fewer values in it than the delta value. this should result // in every value remaining uncompressed @@ -186,8 +174,9 @@ void simple_with_null_tdigest_aggregation(Func op) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } +// shared test for groupby/reduction. template -void simple_all_null_tdigest_aggregation(Func op) +void tdigest_simple_all_nulls_aggregation(Func op) { // create a tdigest that has far fewer values in it than the delta value. this should result // in every value remaining uncompressed @@ -203,8 +192,9 @@ void simple_all_null_tdigest_aggregation(Func op) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } +// shared test for groupby/reduction. template -void simple_large_input_double_tdigest_aggregation(Func op) +void tdigest_simple_large_input_double_aggregation(Func op) { // these tests are being done explicitly because of the way we have to precompute the correct // answers. since the input values generated by the generate_distribution() function below are @@ -276,8 +266,9 @@ void simple_large_input_double_tdigest_aggregation(Func op) } } +// shared test for groupby/reduction. template -void simple_large_input_int_tdigest_aggregation(Func op) +void tdigest_simple_large_input_int_aggregation(Func op) { // these tests are being done explicitly because of the way we have to precompute the correct // answers. since the input values generated by the generate_distribution() function below are @@ -352,8 +343,9 @@ void simple_large_input_int_tdigest_aggregation(Func op) } } +// shared test for groupby/reduction. template -void simple_large_input_decimal_tdigest_aggregation(Func op) +void tdigest_simple_large_input_decimal_aggregation(Func op) { // these tests are being done explicitly because of the way we have to precompute the correct // answers. since the input values generated by the generate_distribution() function below are @@ -426,5 +418,82 @@ void simple_large_input_decimal_tdigest_aggregation(Func op) } } +// Note: there is no need to test different types here as the internals of a tdigest are always +// the same regardless of input. +template +void tdigest_merge_simple(Func op, MergeFunc merge_op) +{ + auto values = generate_standardized_percentile_distribution(data_type{type_id::FLOAT64}); + CUDF_EXPECTS(values->size() == 750000, "Unexpected distribution size"); + + auto split_values = cudf::split(*values, {250000, 500000}); + + int const delta = 1000; + + // generate seperate digests + std::vector> parts; + auto iter = thrust::make_counting_iterator(0); + std::transform( + iter, iter + split_values.size(), std::back_inserter(parts), [&split_values, delta, op](int i) { + return op(split_values[i], delta); + }); + std::vector part_views; + std::transform(parts.begin(), + parts.end(), + std::back_inserter(part_views), + [](std::unique_ptr const& col) { return col->view(); }); + + // merge delta = 1000 + { + int const merge_delta = 1000; + + // merge them + auto merge_input = cudf::concatenate(part_views); + auto result = merge_op(*merge_input, merge_delta); + cudf::tdigest::tdigest_column_view tdv(*result); + + // verify centroids + std::vector expected{{0, 0.00013945158577498588, 2}, + {10, 0.04804393446447510763, 50}, + {59, 1.68846964439246893797, 284}, + {250, 33.36323141295877547918, 1479}, + {368, 65.36307727957283475462, 2292}, + {409, 73.95399208218296394080, 1784}, + {490, 87.67566167909056673579, 1570}, + {491, 87.83119717763385381204, 1570}, + {500, 89.24891838334393412424, 1555}, + {578, 95.87182997389099625707, 583}, + {625, 98.20470345147104751504, 405}, + {700, 99.96818381983835877236, 56}, + {711, 99.99970905482754801596, 1}}; + tdigest_sample_compare(tdv, expected); + + // verify min/max + tdigest_minmax_compare(tdv, *values); + } +} + +// shared test for groupby/reduction. +template +void tdigest_merge_empty(MergeFunc merge_op) +{ + // 3 empty tdigests all in the same group + auto a = cudf::detail::tdigest::make_empty_tdigest_column(); + auto b = cudf::detail::tdigest::make_empty_tdigest_column(); + auto c = cudf::detail::tdigest::make_empty_tdigest_column(); + std::vector cols; + cols.push_back(*a); + cols.push_back(*b); + cols.push_back(*c); + auto values = cudf::concatenate(cols); + + auto const delta = 1000; + auto result = merge_op(*values, delta); + + auto expected = cudf::detail::tdigest::make_empty_tdigest_column(); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result); +} + } // namespace test } // namespace cudf \ No newline at end of file diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index b9ab3970c98..4904aa42723 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -36,8 +36,6 @@ #include #include -#include - #include #include diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu index 56765d55c8a..391cb3e215a 100644 --- a/cpp/src/quantiles/tdigest/tdigest.cu +++ b/cpp/src/quantiles/tdigest/tdigest.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/quantiles/tdigest/tdigest.hpp b/cpp/src/quantiles/tdigest/tdigest.hpp deleted file mode 100644 index 52b19821b90..00000000000 --- a/cpp/src/quantiles/tdigest/tdigest.hpp +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -namespace cudf { -namespace detail { -namespace tdigest { - -/** - * @brief Generate a tdigest column from a grouped set of numeric input values. - * - * The tdigest column produced is of the following structure: - * - * struct { - * // centroids for the digest - * list { - * struct { - * double // mean - * double // weight - * }, - * ... - * } - * // these are from the input stream, not the centroids. they are used - * // during the percentile_approx computation near the beginning or - * // end of the quantiles - * double // min - * double // max - * } - * - * Each output row is a single tdigest. The length of the row is the "size" of the - * tdigest, each element of which represents a weighted centroid (mean, weight). - * - * @param values Grouped (and sorted) values to merge. - * @param group_offsets Offsets of groups' starting points within @p values. - * @param group_labels 0-based ID of group that the corresponding value belongs to - * @param group_valid_counts Per-group counts of valid elements. - * @param num_groups Number of groups. - * @param max_centroids Parameter controlling the level of compression of the tdigest. Higher - * values result in a larger, more precise tdigest. - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned column's device memory - * - * @returns tdigest column, with 1 tdigest per row - */ -std::unique_ptr group_tdigest(column_view const& values, - cudf::device_span group_offsets, - cudf::device_span group_labels, - cudf::device_span group_valid_counts, - size_type num_groups, - int max_centroids, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - -/** - * @brief Merges tdigests within the same group to generate a new tdigest. - * - * The tdigest column produced is of the following structure: - * - * struct { - * // centroids for the digest - * list { - * struct { - * double // mean - * double // weight - * }, - * ... - * } - * // these are from the input stream, not the centroids. they are used - * // during the percentile_approx computation near the beginning or - * // end of the quantiles - * double // min - * double // max - * } - * - * Each output row is a single tdigest. The length of the row is the "size" of the - * tdigest, each element of which represents a weighted centroid (mean, weight). - * - * @param values Grouped tdigests to merge. - * @param group_offsets Offsets of groups' starting points within @p values. - * @param group_labels 0-based ID of group that the corresponding value belongs to - * @param num_groups Number of groups. - * @param max_centroids Parameter controlling the level of compression of the tdigest. Higher - * values result in a larger, more precise tdigest. - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned column's device memory - * - * @returns tdigest column, with 1 tdigest per row - */ -std::unique_ptr group_merge_tdigest(column_view const& values, - cudf::device_span group_offsets, - cudf::device_span group_labels, - size_type num_groups, - int max_centroids, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - -} // namespace tdigest -} // namespace detail -} // namespace cudf diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index be30c0be4ec..3fdb0d55753 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -55,7 +55,7 @@ template struct make_centroid { column_device_view const col; - centroid operator() __device__(size_type index) + centroid operator() __device__(size_type index) const { auto const is_valid = col.is_valid(index); auto const mean = is_valid ? static_cast(col.element(index)) : 0.0; @@ -70,7 +70,7 @@ template struct make_centroid_no_nulls { column_device_view const col; - centroid operator() __device__(size_type index) + centroid operator() __device__(size_type index) const { return {static_cast(col.element(index)), 1.0, true}; } @@ -86,7 +86,7 @@ struct make_weighted_centroid { // merge two centroids struct merge_centroids { - centroid operator() __device__(centroid const& lhs, centroid const& rhs) + centroid operator() __device__(centroid const& lhs, centroid const& rhs) const { bool const lhs_valid = thrust::get<2>(lhs); bool const rhs_valid = thrust::get<2>(rhs); @@ -115,7 +115,7 @@ struct merge_centroids { struct nearest_value_scalar_weights_grouped { offset_type const* group_offsets; - thrust::pair operator() __device__(double next_limit, size_type group_index) + thrust::pair operator() __device__(double next_limit, size_type group_index) const { double const f = floor(next_limit); auto const relative_weight_index = max(0, static_cast(next_limit) - 1); @@ -136,7 +136,7 @@ struct nearest_value_scalar_weights_grouped { struct nearest_value_scalar_weights { size_type const input_size; - thrust::pair operator() __device__(double next_limit, size_type) + thrust::pair operator() __device__(double next_limit, size_type) const { double const f = floor(next_limit); auto const relative_weight_index = max(0, static_cast(next_limit) - 1); @@ -150,12 +150,13 @@ struct nearest_value_scalar_weights { * * This functor assumes we are dealing with grouped, sorted, weighted centroids. */ +template struct nearest_value_centroid_weights { double const* cumulative_weights; - offset_type const* outer_offsets; // groups + GroupOffsetsIter outer_offsets; // groups offset_type const* inner_offsets; // tdigests within a group - thrust::pair operator() __device__(double next_limit, size_type group_index) + thrust::pair operator() __device__(double next_limit, size_type group_index) const { auto const tdigest_begin = outer_offsets[group_index]; auto const tdigest_end = outer_offsets[group_index + 1]; @@ -215,10 +216,11 @@ struct cumulative_scalar_weight { * * This functor assumes we are dealing with grouped, weighted centroids. */ +template struct cumulative_centroid_weight { double const* cumulative_weights; - cudf::device_span group_labels; - offset_type const* outer_offsets; // groups + GroupLabelsIter group_labels; + GroupOffsetsIter outer_offsets; // groups cudf::device_span inner_offsets; // tdigests with a group std::tuple operator() __device__(size_type value_index) const @@ -243,7 +245,7 @@ struct scalar_group_info_grouped { size_type const* group_valid_counts; offset_type const* group_offsets; - __device__ thrust::tuple operator()(size_type group_index) + __device__ thrust::tuple operator()(size_type group_index) const { return {static_cast(group_valid_counts[group_index]), group_offsets[group_index + 1] - group_offsets[group_index], @@ -256,19 +258,20 @@ struct scalar_group_info { double const total_weight; size_type const size; - __device__ thrust::tuple operator()(size_type) + __device__ thrust::tuple operator()(size_type) const { return {total_weight, size, 0}; } }; // retrieve group info of centroid inputs by group index +template struct centroid_group_info { double const* cumulative_weights; - offset_type const* outer_offsets; + GroupOffsetsIter outer_offsets; offset_type const* inner_offsets; - __device__ thrust::tuple operator()(size_type group_index) + __device__ thrust::tuple operator()(size_type group_index) const { // if there's no weights in this group of digests at all, return 0. auto const group_start = inner_offsets[outer_offsets[group_index]]; @@ -283,7 +286,7 @@ struct centroid_group_info { }; struct tdigest_min { - __device__ double operator()(thrust::tuple const& t) + __device__ double operator()(thrust::tuple const& t) const { auto const min = thrust::get<0>(t); auto const size = thrust::get<1>(t); @@ -292,7 +295,7 @@ struct tdigest_min { }; struct tdigest_max { - __device__ double operator()(thrust::tuple const& t) + __device__ double operator()(thrust::tuple const& t) const { auto const max = thrust::get<0>(t); auto const size = thrust::get<1>(t); @@ -307,10 +310,22 @@ __device__ double scale_func_k1(double quantile, double delta_norm) { double k = delta_norm * asin(2.0 * quantile - 1.0); k += 1.0; - double q = (sin(k / delta_norm) + 1.0) / 2.0; + double const q = (sin(k / delta_norm) + 1.0) / 2.0; return q; } +// convert a single-row tdigest column to a scalar. +std::unique_ptr to_tdigest_scalar(std::unique_ptr&& tdigest, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_EXPECTS(tdigest->size() == 1, + "Encountered invalid tdigest column when converting to scalar"); + auto contents = tdigest->release(); + return std::make_unique( + std::move(*std::make_unique(std::move(contents.children))), true, stream, mr); +} + /** * @brief Compute a set of cluster limits (brackets, essentially) for a * given tdigest based on the specified delta and the total weight of values @@ -869,12 +884,11 @@ struct typed_reduce_tdigest { // - several of the functors used during the reduction are cheaper than during a groupby. auto const valid_count = col.size() - col.null_count(); - auto const num_groups = 1; // first, generate cluster weight information for each input group auto [cluster_wl, cluster_offsets, total_clusters] = generate_group_cluster_info(delta, - num_groups, + 1, nearest_value_scalar_weights{valid_count}, scalar_group_info{static_cast(valid_count), valid_count}, cumulative_scalar_weight{}, @@ -888,13 +902,13 @@ struct typed_reduce_tdigest { // compute min and max columns auto min_col = cudf::make_numeric_column( - data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); + data_type{type_id::FLOAT64}, 1, mask_state::UNALLOCATED, stream, mr); auto max_col = cudf::make_numeric_column( - data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); + data_type{type_id::FLOAT64}, 1, mask_state::UNALLOCATED, stream, mr); thrust::transform( rmm::exec_policy(stream), thrust::make_counting_iterator(0), - thrust::make_counting_iterator(0) + num_groups, + thrust::make_counting_iterator(0) + 1, thrust::make_zip_iterator(thrust::make_tuple(min_col->mutable_view().begin(), max_col->mutable_view().begin())), get_scalar_minmax{*d_col, valid_count}); @@ -904,21 +918,20 @@ struct typed_reduce_tdigest { cudf::detail::make_counting_transform_iterator(0, make_centroid_no_nulls{*d_col}); // generate the final tdigest and wrap it in a struct_scalar - auto tdigest = compute_tdigests(delta, - scalar_to_centroid, - scalar_to_centroid + valid_count, - cumulative_scalar_weight{}, - std::move(min_col), - std::move(max_col), - cluster_wl, - std::move(cluster_offsets), - total_clusters, - false, - stream, - mr); - auto contents = tdigest->release(); - return std::make_unique( - std::move(*std::make_unique
(std::move(contents.children))), true, stream, mr); + return to_tdigest_scalar(compute_tdigests(delta, + scalar_to_centroid, + scalar_to_centroid + valid_count, + cumulative_scalar_weight{}, + std::move(min_col), + std::move(max_col), + cluster_wl, + std::move(cluster_offsets), + total_clusters, + false, + stream, + mr), + stream, + mr); } template < @@ -931,89 +944,67 @@ struct typed_reduce_tdigest { } }; -} // anonymous namespace - -std::unique_ptr reduce_tdigest(column_view const& col, - int max_centroids, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - if (col.size() == 0) { return cudf::detail::tdigest::make_empty_tdigest_scalar(stream, mr); } - - // since this isn't coming out of a groupby, we need to sort the inputs in ascending - // order with nulls at the end. - table_view t({col}); - auto sorted = cudf::detail::sort(t, {order::ASCENDING}, {null_order::AFTER}, stream); +// utility for merge_tdigests. +template +struct group_num_weights_func { + GroupOffsetsIter outer_offsets; + size_type const* inner_offsets; - auto const delta = max_centroids; - return cudf::type_dispatcher( - col.type(), typed_reduce_tdigest{}, sorted->get_column(0), delta, stream, mr); -} + __device__ size_type operator()(size_type group_index) + { + auto const tdigest_begin = outer_offsets[group_index]; + auto const tdigest_end = outer_offsets[group_index + 1]; + return inner_offsets[tdigest_end] - inner_offsets[tdigest_begin]; + } +}; -std::unique_ptr group_tdigest(column_view const& col, - cudf::device_span group_offsets, - cudf::device_span group_labels, - cudf::device_span group_valid_counts, - size_type num_groups, - int max_centroids, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - if (col.size() == 0) { return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr); } +// utility for merge_tdigests. +struct group_is_empty { + __device__ bool operator()(size_type group_size) { return group_size == 0; } +}; - auto const delta = max_centroids; - return cudf::type_dispatcher(col.type(), - typed_group_tdigest{}, - col, - group_offsets, - group_labels, - group_valid_counts, - num_groups, - delta, - stream, - mr); -} +// utility for merge_tdigests. +template +struct group_key_func { + GroupLabelsIter group_labels; + size_type const* inner_offsets; + size_type num_inner_offsets; -std::unique_ptr group_merge_tdigest(column_view const& input, - cudf::device_span group_offsets, - cudf::device_span group_labels, - size_type num_groups, - int max_centroids, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - tdigest_column_view tdv(input); + __device__ size_type operator()(size_type index) + { + // what -original- tdigest index this absolute index corresponds to + auto const iter = thrust::prev( + thrust::upper_bound(thrust::seq, inner_offsets, inner_offsets + num_inner_offsets, index)); + auto const tdigest_index = thrust::distance(inner_offsets, iter); - if (num_groups == 0 || input.size() == 0) { - return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr); + // what group index the original tdigest belongs to + return group_labels[tdigest_index]; } +}; - // first step is to merge all the tdigests in each group. at the moment the only way to - // make this work is to retrieve the group sizes (via group_offsets) and the individual digest - // sizes (via input.offsets()) to the gpu and do the merges. The scale problem is that while the - // size of each group will likely be small (size of each group will typically map to # of batches - // the input data was chopped into for tdigest generation), the -number- of groups can be - // arbitrarily large. - // +template +std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, + HGroupOffsetIter h_outer_offsets, + GroupOffsetIter group_offsets, + GroupLabelIter group_labels, + size_t num_group_labels, + size_type num_groups, + int max_centroids, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ // thrust::merge and thrust::merge_by_key don't provide what we need. What we would need is an // algorithm like a super-merge that takes two layers of keys: one which identifies the outer // grouping of tdigests, and one which identifies the inner groupings of the tdigests within the // outer groups. - - // bring group offsets back to the host - std::vector h_outer_offsets(group_offsets.size()); - cudaMemcpyAsync(h_outer_offsets.data(), - group_offsets.data(), - sizeof(size_type) * group_offsets.size(), - cudaMemcpyDeviceToHost, - stream); + // TODO: investigate replacing the iterative merge with a single stable_sort_by_key. // bring tdigest offsets back to the host auto tdigest_offsets = tdv.centroids().offsets(); - std::vector h_inner_offsets(tdigest_offsets.size()); + std::vector h_inner_offsets(tdigest_offsets.size()); cudaMemcpyAsync(h_inner_offsets.data(), - tdigest_offsets.begin(), - sizeof(size_type) * tdigest_offsets.size(), + tdigest_offsets.begin(), + sizeof(offset_type) * tdigest_offsets.size(), cudaMemcpyDeviceToHost, stream); @@ -1025,30 +1016,35 @@ std::unique_ptr group_merge_tdigest(column_view const& input, // generate the merged (but not yet compressed) tdigests for each group. std::vector> tdigests; tdigests.reserve(num_groups); - std::transform( - h_outer_offsets.begin(), - h_outer_offsets.end() - 1, - std::next(h_outer_offsets.begin()), - std::back_inserter(tdigests), - [&](auto tdigest_start, auto tdigest_end) { - // the range of tdigests in this group - auto const num_tdigests = tdigest_end - tdigest_start; - - // slice each tdigest from the input - std::vector unmerged_tdigests; - unmerged_tdigests.reserve(num_tdigests); - auto offset_iter = std::next(h_inner_offsets.begin(), tdigest_start); - std::transform(offset_iter, + std::transform(h_outer_offsets, + h_outer_offsets + num_groups, + std::next(h_outer_offsets), + std::back_inserter(tdigests), + [&](auto tdigest_start, auto tdigest_end) { + // the range of tdigests in this group + auto const num_tdigests = tdigest_end - tdigest_start; + + // slice each tdigest from the input + std::vector unmerged_tdigests; + unmerged_tdigests.reserve(num_tdigests); + auto offset_iter = std::next(h_inner_offsets.begin(), tdigest_start); + std::transform( + offset_iter, offset_iter + num_tdigests, std::next(offset_iter), std::back_inserter(unmerged_tdigests), - [&](auto start, auto end) { + [&](size_type start, size_type end) { return cudf::detail::slice(tdigests_unsliced, {start, end}, stream); }); - // merge - return cudf::detail::merge(unmerged_tdigests, {0}, {order::ASCENDING}, {}, stream, mr); - }); + // merge + return cudf::detail::merge(unmerged_tdigests, + {0}, + {order::ASCENDING}, + {}, + stream, + rmm::mr::get_current_device_resource()); + }); // generate min and max values auto merged_min_col = cudf::make_numeric_column( @@ -1057,8 +1053,8 @@ std::unique_ptr group_merge_tdigest(column_view const& input, thrust::make_zip_iterator(thrust::make_tuple(tdv.min_begin(), tdv.size_begin())), tdigest_min{}); thrust::reduce_by_key(rmm::exec_policy(stream), - group_labels.begin(), - group_labels.end(), + group_labels, + group_labels + num_group_labels, min_iter, thrust::make_discard_iterator(), merged_min_col->mutable_view().begin(), @@ -1071,8 +1067,8 @@ std::unique_ptr group_merge_tdigest(column_view const& input, thrust::make_zip_iterator(thrust::make_tuple(tdv.max_begin(), tdv.size_begin())), tdigest_max{}); thrust::reduce_by_key(rmm::exec_policy(stream), - group_labels.begin(), - group_labels.end(), + group_labels, + group_labels + num_group_labels, max_iter, thrust::make_discard_iterator(), merged_max_col->mutable_view().begin(), @@ -1083,25 +1079,19 @@ std::unique_ptr group_merge_tdigest(column_view const& input, // testing simpler. auto group_num_weights = cudf::detail::make_counting_transform_iterator( 0, - [outer_offsets = group_offsets.data(), - inner_offsets = - tdigest_offsets.begin()] __device__(size_type group_index) -> size_type { - auto const tdigest_begin = outer_offsets[group_index]; - auto const tdigest_end = outer_offsets[group_index + 1]; - return inner_offsets[tdigest_end] - inner_offsets[tdigest_begin]; - }); - auto group_is_empty = [] __device__(size_type group_size) { return group_size == 0; }; + group_num_weights_func{group_offsets, + tdigest_offsets.begin()}); thrust::replace_if(rmm::exec_policy(stream), merged_min_col->mutable_view().begin(), merged_min_col->mutable_view().end(), group_num_weights, - group_is_empty, + group_is_empty{}, 0); thrust::replace_if(rmm::exec_policy(stream), merged_max_col->mutable_view().begin(), merged_max_col->mutable_view().end(), group_num_weights, - group_is_empty, + group_is_empty{}, 0); // concatenate all the merged tdigests back into one table. @@ -1111,7 +1101,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, tdigests.end(), std::back_inserter(tdigest_views), [](std::unique_ptr
const& t) { return t->view(); }); - auto merged = cudf::detail::concatenate(tdigest_views, stream, mr); + auto merged = cudf::detail::concatenate(tdigest_views, stream); // generate cumulative weights auto merged_weights = merged->get_column(1).view(); @@ -1119,17 +1109,8 @@ std::unique_ptr group_merge_tdigest(column_view const& input, data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED); auto keys = cudf::detail::make_counting_transform_iterator( 0, - [group_labels = group_labels.begin(), - inner_offsets = tdigest_offsets.begin(), - num_inner_offsets = tdigest_offsets.size()] __device__(int index) { - // what -original- tdigest index this absolute index corresponds to - auto const iter = thrust::prev( - thrust::upper_bound(thrust::seq, inner_offsets, inner_offsets + num_inner_offsets, index)); - auto const tdigest_index = thrust::distance(inner_offsets, iter); - - // what group index the original tdigest belongs to - return group_labels[tdigest_index]; - }); + group_key_func{ + group_labels, tdigest_offsets.begin(), tdigest_offsets.size()}); thrust::inclusive_scan_by_key(rmm::exec_policy(stream), keys, keys + cumulative_weights->size(), @@ -1142,16 +1123,17 @@ std::unique_ptr group_merge_tdigest(column_view const& input, auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, num_groups, - nearest_value_centroid_weights{cumulative_weights->view().begin(), - group_offsets.data(), - tdigest_offsets.begin()}, - centroid_group_info{cumulative_weights->view().begin(), - group_offsets.data(), - tdigest_offsets.begin()}, - cumulative_centroid_weight{ + nearest_value_centroid_weights{ + cumulative_weights->view().begin(), + group_offsets, + tdigest_offsets.begin()}, + centroid_group_info{cumulative_weights->view().begin(), + group_offsets, + tdigest_offsets.begin()}, + cumulative_centroid_weight{ cumulative_weights->view().begin(), group_labels, - group_offsets.data(), + group_offsets, {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, false, stream, @@ -1164,22 +1146,126 @@ std::unique_ptr group_merge_tdigest(column_view const& input, merged_weights.begin()}); // compute the tdigest - return compute_tdigests(delta, - centroids, - centroids + merged->num_rows(), - cumulative_centroid_weight{cumulative_weights->view().begin(), - group_labels, - group_offsets.data(), - {tdigest_offsets.begin(), - static_cast(tdigest_offsets.size())}}, - std::move(merged_min_col), - std::move(merged_max_col), - group_cluster_wl, - std::move(group_cluster_offsets), - total_clusters, - false, - stream, - mr); + return compute_tdigests( + delta, + centroids, + centroids + merged->num_rows(), + cumulative_centroid_weight{ + cumulative_weights->view().begin(), + group_labels, + group_offsets, + {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, + std::move(merged_min_col), + std::move(merged_max_col), + group_cluster_wl, + std::move(group_cluster_offsets), + total_clusters, + false, + stream, + mr); +} + +} // anonymous namespace + +std::unique_ptr reduce_tdigest(column_view const& col, + int max_centroids, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + if (col.size() == 0) { return cudf::detail::tdigest::make_empty_tdigest_scalar(stream, mr); } + + // since this isn't coming out of a groupby, we need to sort the inputs in ascending + // order with nulls at the end. + table_view t({col}); + auto sorted = cudf::detail::sort(t, {order::ASCENDING}, {null_order::AFTER}, stream); + + auto const delta = max_centroids; + return cudf::type_dispatcher( + col.type(), typed_reduce_tdigest{}, sorted->get_column(0), delta, stream, mr); +} + +std::unique_ptr reduce_merge_tdigest(column_view const& input, + int max_centroids, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + tdigest_column_view tdv(input); + + if (input.size() == 0) { return cudf::detail::tdigest::make_empty_tdigest_scalar(stream, mr); } + + auto h_group_offsets = cudf::detail::make_counting_transform_iterator( + 0, [size = input.size()](size_type i) { return i == 0 ? 0 : size; }); + auto group_offsets = cudf::detail::make_counting_transform_iterator( + 0, [size = input.size()] __device__(size_type i) { return i == 0 ? 0 : size; }); + auto group_labels = thrust::make_constant_iterator(0); + return to_tdigest_scalar(merge_tdigests(tdv, + h_group_offsets, + group_offsets, + group_labels, + input.size(), + 1, + max_centroids, + stream, + mr), + stream, + mr); +} + +std::unique_ptr group_tdigest(column_view const& col, + cudf::device_span group_offsets, + cudf::device_span group_labels, + cudf::device_span group_valid_counts, + size_type num_groups, + int max_centroids, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + if (col.size() == 0) { return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr); } + + auto const delta = max_centroids; + return cudf::type_dispatcher(col.type(), + typed_group_tdigest{}, + col, + group_offsets, + group_labels, + group_valid_counts, + num_groups, + delta, + stream, + mr); +} + +std::unique_ptr group_merge_tdigest(column_view const& input, + cudf::device_span group_offsets, + cudf::device_span group_labels, + size_type num_groups, + int max_centroids, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + tdigest_column_view tdv(input); + + if (num_groups == 0 || input.size() == 0) { + return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr); + } + + // bring group offsets back to the host + std::vector h_group_offsets(group_offsets.size()); + cudaMemcpyAsync(h_group_offsets.data(), + group_offsets.begin(), + sizeof(size_type) * group_offsets.size(), + cudaMemcpyDeviceToHost, + stream); + + return merge_tdigests(tdv, + h_group_offsets.begin(), + group_offsets.data(), + group_labels.data(), + group_labels.size(), + num_groups, + max_centroids, + stream, + mr); } } // namespace tdigest diff --git a/cpp/src/reductions/reductions.cpp b/cpp/src/reductions/reductions.cpp index 8f5b7b3d18b..57313e3fd65 100644 --- a/cpp/src/reductions/reductions.cpp +++ b/cpp/src/reductions/reductions.cpp @@ -102,7 +102,7 @@ struct reduce_dispatch_functor { case aggregation::NTH_ELEMENT: { auto nth_agg = dynamic_cast(agg.get()); return reduction::nth_element(col, nth_agg->_n, nth_agg->_null_handling, stream, mr); - } break; + } break; case aggregation::COLLECT_LIST: { auto col_agg = dynamic_cast(agg.get()); return reduction::collect_list(col, col_agg->_null_handling, stream, mr); @@ -123,6 +123,10 @@ struct reduce_dispatch_functor { auto td_agg = dynamic_cast(agg.get()); return detail::tdigest::reduce_tdigest(col, td_agg->max_centroids, stream, mr); } break; + case aggregation::MERGE_TDIGEST: { + auto td_agg = dynamic_cast(agg.get()); + return detail::tdigest::reduce_merge_tdigest(col, td_agg->max_centroids, stream, mr); + } break; default: CUDF_FAIL("Unsupported reduction operator"); } } diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 24013da62b9..1f1221784d6 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -150,8 +150,13 @@ ConfigureTest( # ################################################################################################## # * reduction tests ------------------------------------------------------------------------------- ConfigureTest( - REDUCTION_TEST reductions/collect_ops_tests.cpp reductions/rank_tests.cpp - reductions/reduction_tests.cpp reductions/scan_tests.cpp reductions/segmented_reduction_tests.cpp + REDUCTION_TEST + reductions/collect_ops_tests.cpp + reductions/rank_tests.cpp + reductions/reduction_tests.cpp + reductions/scan_tests.cpp + reductions/segmented_reduction_tests.cpp + reductions/tdigest_tests.cu ) # ################################################################################################## diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index 875a09c019a..7e6199e73c5 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -32,13 +32,41 @@ namespace test { using namespace cudf; -template -struct TDigestAllTypes : public cudf::test::BaseFixture { +/** + * @brief Functor to generate a tdigest by key. + * + */ +struct tdigest_gen_grouped { + template < + typename T, + typename std::enable_if_t() || cudf::is_fixed_point()>* = nullptr> + std::unique_ptr operator()(column_view const& keys, column_view const& values, int delta) + { + cudf::table_view t({keys}); + cudf::groupby::groupby gb(t); + std::vector requests; + std::vector> aggregations; + aggregations.push_back(cudf::make_tdigest_aggregation(delta)); + requests.push_back({values, std::move(aggregations)}); + auto result = gb.aggregate(requests); + return std::move(result.second[0].results[0]); + } + + template < + typename T, + typename std::enable_if_t() && !cudf::is_fixed_point()>* = nullptr> + std::unique_ptr operator()(column_view const& keys, column_view const& values, int delta) + { + CUDF_FAIL("Invalid tdigest test type"); + } }; -TYPED_TEST_SUITE(TDigestAllTypes, cudf::test::NumericTypes); -struct groupby_simple_op { - std::unique_ptr operator()(column_view const& values, int delta) +/** + * @brief Functor for generating a tdigest using groupby with a constant key. + * + */ +struct tdigest_groupby_simple_op { + std::unique_ptr operator()(column_view const& values, int delta) const { // make a simple set of matching keys. auto keys = cudf::make_fixed_width_column( @@ -59,22 +87,54 @@ struct groupby_simple_op { } }; +/** + * @brief Functor for merging tdigests using groupby with a constant key. + * + */ +struct tdigest_groupby_simple_merge_op { + std::unique_ptr operator()(column_view const& merge_values, int merge_delta) const + { + // make a simple set of matching keys. + auto merge_keys = cudf::make_fixed_width_column( + data_type{type_id::INT32}, merge_values.size(), mask_state::UNALLOCATED); + thrust::fill(rmm::exec_policy(rmm::cuda_stream_default), + merge_keys->mutable_view().template begin(), + merge_keys->mutable_view().template end(), + 0); + + cudf::table_view key_table({*merge_keys}); + cudf::groupby::groupby gb(key_table); + std::vector requests; + std::vector> aggregations; + aggregations.push_back( + cudf::make_merge_tdigest_aggregation(merge_delta)); + requests.push_back({merge_values, std::move(aggregations)}); + auto result = gb.aggregate(requests); + return std::move(result.second[0].results[0]); + } +}; + +template +struct TDigestAllTypes : public cudf::test::BaseFixture { +}; +TYPED_TEST_SUITE(TDigestAllTypes, cudf::test::NumericTypes); + TYPED_TEST(TDigestAllTypes, Simple) { using T = TypeParam; - simple_tdigest_aggregation(groupby_simple_op{}); + tdigest_simple_aggregation(tdigest_groupby_simple_op{}); } TYPED_TEST(TDigestAllTypes, SimpleWithNulls) { using T = TypeParam; - simple_with_null_tdigest_aggregation(groupby_simple_op{}); + tdigest_simple_with_nulls_aggregation(tdigest_groupby_simple_op{}); } TYPED_TEST(TDigestAllTypes, AllNull) { using T = TypeParam; - simple_all_null_tdigest_aggregation(groupby_simple_op{}); + tdigest_simple_all_nulls_aggregation(tdigest_groupby_simple_op{}); } TYPED_TEST(TDigestAllTypes, LargeGroups) @@ -174,17 +234,17 @@ TEST_F(TDigestTest, EmptyMixed) TEST_F(TDigestTest, LargeInputDouble) { - simple_large_input_double_tdigest_aggregation(groupby_simple_op{}); + tdigest_simple_large_input_double_aggregation(tdigest_groupby_simple_op{}); } TEST_F(TDigestTest, LargeInputInt) { - simple_large_input_int_tdigest_aggregation(groupby_simple_op{}); + tdigest_simple_large_input_int_aggregation(tdigest_groupby_simple_op{}); } TEST_F(TDigestTest, LargeInputDecimal) { - simple_large_input_decimal_tdigest_aggregation(groupby_simple_op{}); + tdigest_simple_large_input_decimal_aggregation(tdigest_groupby_simple_op{}); } struct TDigestMergeTest : public cudf::test::BaseFixture { @@ -194,81 +254,7 @@ struct TDigestMergeTest : public cudf::test::BaseFixture { // the same regardless of input. TEST_F(TDigestMergeTest, Simple) { - auto values = generate_standardized_percentile_distribution(data_type{type_id::FLOAT64}); - CUDF_EXPECTS(values->size() == 750000, "Unexpected distribution size"); - // all in the same group - auto keys = cudf::make_fixed_width_column( - data_type{type_id::INT32}, values->size(), mask_state::UNALLOCATED); - thrust::fill(rmm::exec_policy(rmm::cuda_stream_default), - keys->mutable_view().template begin(), - keys->mutable_view().template end(), - 0); - - auto split_values = cudf::split(*values, {250000, 500000}); - auto split_keys = cudf::split(*keys, {250000, 500000}); - - int const delta = 1000; - - // generate seperate digests - std::vector> parts; - auto iter = thrust::make_counting_iterator(0); - std::transform( - iter, - iter + split_values.size(), - std::back_inserter(parts), - [&split_keys, &split_values, delta](int i) { - cudf::table_view t({split_keys[i]}); - cudf::groupby::groupby gb(t); - std::vector requests; - std::vector> aggregations; - aggregations.push_back(cudf::make_tdigest_aggregation(delta)); - requests.push_back({split_values[i], std::move(aggregations)}); - auto result = gb.aggregate(requests); - return std::move(result.second[0].results[0]); - }); - std::vector part_views; - std::transform(parts.begin(), - parts.end(), - std::back_inserter(part_views), - [](std::unique_ptr const& col) { return col->view(); }); - - // merge delta = 1000 - { - int const merge_delta = 1000; - - // merge them - auto merge_input = cudf::concatenate(part_views); - cudf::test::fixed_width_column_wrapper merge_keys{0, 0, 0}; - cudf::table_view key_table({merge_keys}); - cudf::groupby::groupby gb(key_table); - std::vector requests; - std::vector> aggregations; - aggregations.push_back( - cudf::make_merge_tdigest_aggregation(merge_delta)); - requests.push_back({*merge_input, std::move(aggregations)}); - auto result = gb.aggregate(requests); - - cudf::tdigest::tdigest_column_view tdv(*result.second[0].results[0]); - - // verify centroids - std::vector expected{{0, 0.00013945158577498588, 2}, - {10, 0.04804393446447510763, 50}, - {59, 1.68846964439246893797, 284}, - {250, 33.36323141295877547918, 1479}, - {368, 65.36307727957283475462, 2292}, - {409, 73.95399208218296394080, 1784}, - {490, 87.67566167909056673579, 1570}, - {491, 87.83119717763385381204, 1570}, - {500, 89.24891838334393412424, 1555}, - {578, 95.87182997389099625707, 583}, - {625, 98.20470345147104751504, 405}, - {700, 99.96818381983835877236, 56}, - {711, 99.99970905482754801596, 1}}; - tdigest_sample_compare(tdv, expected); - - // verify min/max - tdigest_minmax_compare(tdv, *values); - } + tdigest_merge_simple(tdigest_groupby_simple_op{}, tdigest_groupby_simple_merge_op{}); } struct key_groups { @@ -466,32 +452,7 @@ TEST_F(TDigestMergeTest, Grouped) } } -TEST_F(TDigestMergeTest, Empty) -{ - // 3 empty tdigests all in the same group - auto a = cudf::detail::tdigest::make_empty_tdigest_column(); - auto b = cudf::detail::tdigest::make_empty_tdigest_column(); - auto c = cudf::detail::tdigest::make_empty_tdigest_column(); - std::vector cols; - cols.push_back(*a); - cols.push_back(*b); - cols.push_back(*c); - auto values = cudf::concatenate(cols); - cudf::test::fixed_width_column_wrapper keys{0, 0, 0}; - - auto const delta = 1000; - cudf::table_view t({keys}); - cudf::groupby::groupby gb(t); - std::vector requests; - std::vector> aggregations; - aggregations.push_back(cudf::make_merge_tdigest_aggregation(delta)); - requests.push_back({*values, std::move(aggregations)}); - auto result = gb.aggregate(requests); - - auto expected = cudf::detail::tdigest::make_empty_tdigest_column(); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); -} +TEST_F(TDigestMergeTest, Empty) { tdigest_merge_empty(tdigest_groupby_simple_merge_op{}); } TEST_F(TDigestMergeTest, EmptyGroups) { diff --git a/cpp/tests/reductions/reduction_tests.cpp b/cpp/tests/reductions/reduction_tests.cpp index 19317a0e065..0b90c241f31 100644 --- a/cpp/tests/reductions/reduction_tests.cpp +++ b/cpp/tests/reductions/reduction_tests.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include @@ -2474,45 +2473,4 @@ TEST_F(StructReductionTest, StructReductionMinMaxWithNulls) } } -template -struct ReductionTDigestAllTypes : public cudf::test::BaseFixture { -}; -TYPED_TEST_SUITE(ReductionTDigestAllTypes, cudf::test::NumericTypes); - -struct reduce_op { - std::unique_ptr operator()(cudf::column_view const& values, int delta) - { - // result is a scalar, but we want to extract out the underlying column - auto scalar_result = - cudf::reduce(values, - cudf::make_tdigest_aggregation(delta), - cudf::data_type{cudf::type_id::FLOAT64}); - auto tbl = static_cast(scalar_result.get())->view(); - std::vector> cols; - std::transform( - tbl.begin(), tbl.end(), std::back_inserter(cols), [](cudf::column_view const& col) { - return std::make_unique(col); - }); - return cudf::make_structs_column(tbl.num_rows(), std::move(cols), 0, rmm::device_buffer()); - } -}; - -TYPED_TEST(ReductionTDigestAllTypes, Simple) -{ - using T = TypeParam; - cudf::test::simple_tdigest_aggregation(reduce_op{}); -} - -TYPED_TEST(ReductionTDigestAllTypes, SimpleWithNulls) -{ - using T = TypeParam; - cudf::test::simple_with_null_tdigest_aggregation(reduce_op{}); -} - -TYPED_TEST(ReductionTDigestAllTypes, AllNull) -{ - using T = TypeParam; - cudf::test::simple_all_null_tdigest_aggregation(reduce_op{}); -} - CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/reductions/tdigest_tests.cu b/cpp/tests/reductions/tdigest_tests.cu new file mode 100644 index 00000000000..12e94ffd0f1 --- /dev/null +++ b/cpp/tests/reductions/tdigest_tests.cu @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include + +namespace cudf { +namespace test { + +template +struct ReductionTDigestAllTypes : public cudf::test::BaseFixture { +}; +TYPED_TEST_SUITE(ReductionTDigestAllTypes, cudf::test::NumericTypes); + +struct reduce_op { + std::unique_ptr operator()(cudf::column_view const& values, int delta) const + { + // result is a scalar, but we want to extract out the underlying column + auto scalar_result = + cudf::reduce(values, + cudf::make_tdigest_aggregation(delta), + cudf::data_type{cudf::type_id::FLOAT64}); + auto tbl = static_cast(scalar_result.get())->view(); + std::vector> cols; + std::transform( + tbl.begin(), tbl.end(), std::back_inserter(cols), [](cudf::column_view const& col) { + return std::make_unique(col); + }); + return cudf::make_structs_column(tbl.num_rows(), std::move(cols), 0, rmm::device_buffer()); + } +}; + +struct reduce_merge_op { + std::unique_ptr operator()(cudf::column_view const& values, int delta) const + { + // result is a scalar, but we want to extract out the underlying column + auto scalar_result = + cudf::reduce(values, + cudf::make_merge_tdigest_aggregation(delta), + cudf::data_type{cudf::type_id::FLOAT64}); + auto tbl = static_cast(scalar_result.get())->view(); + std::vector> cols; + std::transform( + tbl.begin(), tbl.end(), std::back_inserter(cols), [](cudf::column_view const& col) { + return std::make_unique(col); + }); + return cudf::make_structs_column(tbl.num_rows(), std::move(cols), 0, rmm::device_buffer()); + } +}; + +TYPED_TEST(ReductionTDigestAllTypes, Simple) +{ + using T = TypeParam; + tdigest_simple_aggregation(reduce_op{}); +} + +TYPED_TEST(ReductionTDigestAllTypes, SimpleWithNulls) +{ + using T = TypeParam; + tdigest_simple_with_nulls_aggregation(reduce_op{}); +} + +TYPED_TEST(ReductionTDigestAllTypes, AllNull) +{ + using T = TypeParam; + tdigest_simple_all_nulls_aggregation(reduce_op{}); +} + +struct ReductionTDigestMerge : public cudf::test::BaseFixture { +}; + +TEST_F(ReductionTDigestMerge, Simple) { tdigest_merge_simple(reduce_op{}, reduce_merge_op{}); } + +} // namespace test +} // namespace cudf \ No newline at end of file