From 4beee70374fdab4f97af87ac2ed65e2ad51905d1 Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Mon, 18 Oct 2021 12:44:05 -0500 Subject: [PATCH] Improvements to tdigest aggregation code. (#9403) Addresses comments from initial PR (https://github.com/rapidsai/cudf/pull/8983). Specifically implementing a tdigest_column_view for more cleanly accessing the various sub-columns of a tdigest column. Includes several bounds checking fixes for empty groups. Addresses an issue where entirely empty digests could potentially lead to an incorrect min/max values, which isn't technically _wrong_ but makes constructing test cases tricky. Authors: - https://github.com/nvdbaranec Approvers: - AJ Schmidt (https://github.com/ajschmidt8) - Robert (Bobby) Evans (https://github.com/revans2) - Jake Hemstad (https://github.com/jrhemstad) - MithunR (https://github.com/mythrocks) - Mike Wilson (https://github.com/hyperbolic2346) URL: https://github.com/rapidsai/cudf/pull/9403 --- conda/recipes/libcudf/meta.yaml | 1 + cpp/CMakeLists.txt | 1 + cpp/include/cudf/detail/quantiles.hpp | 5 +- cpp/include/cudf/detail/tdigest/tdigest.hpp | 54 +- cpp/include/cudf/quantiles.hpp | 4 +- .../cudf/tdigest/tdigest_column_view.cuh | 126 +++++ cpp/src/groupby/sort/group_tdigest.cu | 140 +++-- cpp/src/quantiles/tdigest/tdigest.cu | 142 +++-- .../quantiles/tdigest/tdigest_column_view.cpp | 80 +++ cpp/tests/groupby/tdigest_tests.cu | 516 ++++++++++++++++-- cpp/tests/quantiles/percentile_approx_test.cu | 22 +- java/src/main/native/src/ColumnViewJni.cpp | 17 +- 12 files changed, 871 insertions(+), 237 deletions(-) create mode 100644 cpp/include/cudf/tdigest/tdigest_column_view.cuh create mode 100644 cpp/src/quantiles/tdigest/tdigest_column_view.cpp diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index fd687de6698..e0b94efa210 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -218,6 +218,7 @@ test: - test -f $PREFIX/include/cudf/structs/detail/concatenate.hpp - test -f $PREFIX/include/cudf/table/table.hpp - test -f $PREFIX/include/cudf/table/table_view.hpp + - test -f $PREFIX/include/cudf/tdigest/tdigest_column_view.cuh - test -f $PREFIX/include/cudf/transform.hpp - test -f $PREFIX/include/cudf/transpose.hpp - test -f $PREFIX/include/cudf/types.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f8c2c16b9d8..3bfdf23c2bc 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -321,6 +321,7 @@ add_library(cudf src/partitioning/partitioning.cu src/partitioning/round_robin.cu src/quantiles/tdigest/tdigest.cu + src/quantiles/tdigest/tdigest_column_view.cpp src/quantiles/quantile.cu src/quantiles/quantiles.cu src/reductions/all.cu diff --git a/cpp/include/cudf/detail/quantiles.hpp b/cpp/include/cudf/detail/quantiles.hpp index 7a76f9cab88..c9e1dc0c776 100644 --- a/cpp/include/cudf/detail/quantiles.hpp +++ b/cpp/include/cudf/detail/quantiles.hpp @@ -16,6 +16,7 @@ #pragma once #include +#include #include @@ -52,13 +53,13 @@ std::unique_ptr quantiles( rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** - * @copydoc cudf::percentile_approx(column_view const&, column_view const&, + * @copydoc cudf::percentile_approx(tdigest_column_view const&, column_view const&, * rmm::mr::device_memory_resource*) * * @param stream CUDA stream used for device memory operations and kernel launches. */ std::unique_ptr percentile_approx( - column_view const& input, + tdigest::tdigest_column_view const& input, column_view const& percentiles, rmm::cuda_stream_view stream = rmm::cuda_stream_default, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp index 94c22911c1e..852f95be96b 100644 --- a/cpp/include/cudf/detail/tdigest/tdigest.hpp +++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp @@ -23,42 +23,32 @@ namespace detail { namespace tdigest { -// mean and weight column indices within tdigest inner struct columns -constexpr size_type mean_column_index = 0; -constexpr size_type weight_column_index = 1; - -// min and max column indices within tdigest outer struct columns -constexpr size_type centroid_column_index = 0; -constexpr size_type min_column_index = 1; -constexpr size_type max_column_index = 2; - /** - * @brief Verifies that the input column is a valid tdigest column. - * - * struct { - * // centroids for the digest - * list { - * struct { - * double // mean - * double // weight - * }, - * ... - * } - * // these are from the input stream, not the centroids. they are used - * // during the percentile_approx computation near the beginning or - * // end of the quantiles - * double // min - * double // max - * } - * - * Each output row is a single tdigest. The length of the row is the "size" of the - * tdigest, each element of which represents a weighted centroid (mean, weight). + * @brief Create a tdigest column from it's constituent components. * - * @param col Column to be checkeed + * @param num_rows The number of rows in the output column. + * @param centroid_means The inner means column. These values are partitioned into lists by the + * `tdigest_offsets` column. + * @param centroid_weights The inner weights column. These values are partitioned into lists by the + * `tdigest_offsets` column. + * @param tdigest_offsets Offsets representing each individual tdigest in the output column. The + * offsets partition the centroid means and weights. + * @param min_values Column representing the minimum input value for each tdigest. + * @param max_values Column representing the maximum input value for each tdigest. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory. * - * @throws cudf::logic error if the column is not a valid tdigest column. + * @returns The constructed tdigest column. */ -void check_is_valid_tdigest_column(column_view const& col); +std::unique_ptr make_tdigest_column( + size_type num_rows, + std::unique_ptr&& centroid_means, + std::unique_ptr&& centroid_weights, + std::unique_ptr&& tdigest_offsets, + std::unique_ptr&& min_values, + std::unique_ptr&& max_values, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** * @brief Create an empty tdigest column. diff --git a/cpp/include/cudf/quantiles.hpp b/cpp/include/cudf/quantiles.hpp index 092a44a9b04..6aa72de8bc7 100644 --- a/cpp/include/cudf/quantiles.hpp +++ b/cpp/include/cudf/quantiles.hpp @@ -17,8 +17,8 @@ #pragma once #include -#include #include +#include #include namespace cudf { @@ -121,7 +121,7 @@ std::unique_ptr
quantiles( * @returns LIST Column containing requested percentile values as FLOAT64. */ std::unique_ptr percentile_approx( - structs_column_view const& input, + tdigest::tdigest_column_view const& input, column_view const& percentiles, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); diff --git a/cpp/include/cudf/tdigest/tdigest_column_view.cuh b/cpp/include/cudf/tdigest/tdigest_column_view.cuh new file mode 100644 index 00000000000..c7513452387 --- /dev/null +++ b/cpp/include/cudf/tdigest/tdigest_column_view.cuh @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +namespace cudf { +namespace tdigest { + +struct tdigest_size { + size_type const* offsets; + __device__ size_type operator()(size_type tdigest_index) + { + return offsets[tdigest_index + 1] - offsets[tdigest_index]; + } +}; + +/** + * @brief Given a column_view containing tdigest data, an instance of this class + * provides a wrapper on the compound column for tdigest operations. + * + * A tdigest is a "compressed" set of input scalars represented as a sorted + * set of centroids (https://arxiv.org/pdf/1902.04023.pdf). + * This data can be queried for quantile information. Each row in a tdigest + * column represents an entire tdigest. + * + * The column has the following structure: + * + * struct { + * // centroids for the digest + * list { + * struct { + * double // mean + * double // weight + * } + * } + * // these are from the input stream, not the centroids. they are used + * // during the percentile_approx computation near the beginning or + * // end of the quantiles + * double // min + * double // max + * } + */ +class tdigest_column_view : private column_view { + public: + tdigest_column_view(column_view const& col); + tdigest_column_view(tdigest_column_view&& tdigest_view) = default; + tdigest_column_view(const tdigest_column_view& tdigest_view) = default; + ~tdigest_column_view() = default; + tdigest_column_view& operator=(tdigest_column_view const&) = default; + tdigest_column_view& operator=(tdigest_column_view&&) = default; + + using column_view::size; + static_assert(std::is_same_v, + "offset_type is expected to be the same as size_type."); + using offset_iterator = offset_type const*; + + // mean and weight column indices within tdigest inner struct columns + static constexpr size_type mean_column_index{0}; + static constexpr size_type weight_column_index{1}; + + // min and max column indices within tdigest outer struct columns + static constexpr size_type centroid_column_index{0}; + static constexpr size_type min_column_index{1}; + static constexpr size_type max_column_index{2}; + + /** + * @brief Returns the parent column. + */ + column_view parent() const; + + /** + * @brief Returns the column of centroids + */ + lists_column_view centroids() const; + + /** + * @brief Returns the internal column of mean values + */ + column_view means() const; + + /** + * @brief Returns the internal column of weight values + */ + column_view weights() const; + + /** + * @brief Returns an iterator that returns the size of each tdigest + * in the column (each row is 1 digest) + */ + auto size_begin() const + { + return cudf::detail::make_counting_transform_iterator( + 0, tdigest_size{centroids().offsets_begin()}); + } + + /** + * @brief Returns the first min value for the column. Each row corresponds + * to the minimum value for the accompanying digest. + */ + double const* min_begin() const; + + /** + * @brief Returns the first max value for the column. Each row corresponds + * to the maximum value for the accompanying digest. + */ + double const* max_begin() const; +}; + +} // namespace tdigest +} // namespace cudf diff --git a/cpp/src/groupby/sort/group_tdigest.cu b/cpp/src/groupby/sort/group_tdigest.cu index 0738e4c5730..d2bc0f42bda 100644 --- a/cpp/src/groupby/sort/group_tdigest.cu +++ b/cpp/src/groupby/sort/group_tdigest.cu @@ -25,10 +25,10 @@ #include #include #include +#include #include #include -#include #include #include @@ -40,6 +40,8 @@ namespace cudf { namespace groupby { namespace detail { +using namespace cudf::tdigest; + namespace { // the most representative point within a cluster of similar @@ -119,6 +121,9 @@ struct nearest_value_centroid_weights { auto const tdigest_begin = outer_offsets[group_index]; auto const tdigest_end = outer_offsets[group_index + 1]; auto const num_weights = inner_offsets[tdigest_end] - inner_offsets[tdigest_begin]; + // NOTE: as it is today, this functor will never be called for any digests that are empty, but + // I'll leave this check here for safety. + if (num_weights == 0) { return thrust::pair{0, 0}; } double const* group_cumulative_weights = cumulative_weights + inner_offsets[tdigest_begin]; auto const index = ((thrust::lower_bound(thrust::seq, @@ -179,6 +184,24 @@ struct cumulative_centroid_weight { } }; +struct tdigest_min { + __device__ double operator()(thrust::tuple const& t) + { + auto const min = thrust::get<0>(t); + auto const size = thrust::get<1>(t); + return size > 0 ? min : std::numeric_limits::max(); + } +}; + +struct tdigest_max { + __device__ double operator()(thrust::tuple const& t) + { + auto const max = thrust::get<0>(t); + auto const size = thrust::get<1>(t); + return size > 0 ? max : std::numeric_limits::lowest(); + } +}; + // a monotonically increasing scale function which produces a distribution // of centroids that is more densely packed in the middle of the input // than at the ends. @@ -446,18 +469,7 @@ std::unique_ptr compute_tdigests(int delta, // double // max // } // - // if (total_clusters == 0) { return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr); } - std::vector> inner_children; - // mean - inner_children.push_back(cudf::make_fixed_width_column( - data_type{type_id::FLOAT64}, total_clusters, mask_state::UNALLOCATED, stream, mr)); - // weight - inner_children.push_back(cudf::make_fixed_width_column( - data_type{type_id::FLOAT64}, total_clusters, mask_state::UNALLOCATED, stream, mr)); - // tdigest struct - auto tdigests = - cudf::make_structs_column(total_clusters, std::move(inner_children), 0, {}, stream, mr); // each input group represents an individual tdigest. within each tdigest, we want the keys // to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall @@ -469,14 +481,17 @@ std::unique_ptr compute_tdigests(int delta, group_cluster_wl = group_cluster_wl.data(), group_cluster_offsets = group_cluster_offsets->view().begin(), group_cumulative_weight] __device__(size_type value_index) -> size_type { + // get group index, relative value index within the group and cumulative weight. auto [group_index, relative_value_index, cumulative_weight] = group_cumulative_weight(value_index); (void)relative_value_index; - // compute start of cluster weight limits for this group - double const* weight_limits = group_cluster_wl + group_cluster_offsets[group_index]; auto const num_clusters = group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index]; + if (num_clusters == 0) { return group_cluster_offsets[group_index]; } + + // compute start of cluster weight limits for this group + double const* weight_limits = group_cluster_wl + group_cluster_offsets[group_index]; // local cluster index size_type const group_cluster_index = @@ -490,11 +505,14 @@ std::unique_ptr compute_tdigests(int delta, return group_cluster_index + group_cluster_offsets[group_index]; }); + // mean and weight data + auto centroid_means = cudf::make_fixed_width_column( + data_type{type_id::FLOAT64}, total_clusters, mask_state::UNALLOCATED, stream, mr); + auto centroid_weights = cudf::make_fixed_width_column( + data_type{type_id::FLOAT64}, total_clusters, mask_state::UNALLOCATED, stream, mr); // reduce the centroids down by key. - cudf::mutable_column_view mean_col = - tdigests->child(cudf::detail::tdigest::mean_column_index).mutable_view(); - cudf::mutable_column_view weight_col = - tdigests->child(cudf::detail::tdigest::weight_column_index).mutable_view(); + cudf::mutable_column_view mean_col(*centroid_means); + cudf::mutable_column_view weight_col(*centroid_weights); auto output = thrust::make_zip_iterator(thrust::make_tuple( mean_col.begin(), weight_col.begin(), thrust::make_discard_iterator())); auto const num_values = std::distance(centroids_begin, centroids_end); @@ -507,17 +525,16 @@ std::unique_ptr compute_tdigests(int delta, thrust::equal_to{}, // key equality check merge_centroids{}); - // create the list - auto const num_groups = group_cluster_offsets->size() - 1; - auto list = cudf::make_lists_column( - num_groups, std::move(group_cluster_offsets), std::move(tdigests), 0, {}); - // create final tdigest column - std::vector> children; - children.push_back(std::move(list)); - children.push_back(std::move(min_col)); - children.push_back(std::move(max_col)); - return make_structs_column(num_groups, std::move(children), 0, {}, stream, mr); + auto const num_output_rows = group_cluster_offsets->size() - 1; + return cudf::detail::tdigest::make_tdigest_column(num_output_rows, + std::move(centroid_means), + std::move(centroid_weights), + std::move(group_cluster_offsets), + std::move(min_col), + std::move(max_col), + stream, + mr); } // retrieve total weight of scalar inputs by group index @@ -648,22 +665,12 @@ std::unique_ptr group_merge_tdigest(column_view const& input, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - cudf::detail::tdigest::check_is_valid_tdigest_column(input); + tdigest_column_view tdv(input); if (num_groups == 0 || input.size() == 0) { return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr); } - structs_column_view scv(input); - lists_column_view lcv(scv.child(cudf::detail::tdigest::centroid_column_index)); - // ideally, we would just call .parent().child() here because tdigests cannot be - // sliced. however, lists_column_view() hides that particular interface. However, - // for the same reason, get_sliced_child() should be just as cheap. - auto data = lcv.get_sliced_child(stream); - structs_column_view tdigest(data); - auto mean = tdigest.child(cudf::detail::tdigest::mean_column_index); - auto weight = tdigest.child(cudf::detail::tdigest::weight_column_index); - // first step is to merge all the tdigests in each group. at the moment the only way to // make this work is to retrieve the group sizes (via group_offsets) and the individual digest // sizes (via input.offsets()) to the gpu and do the merges. The scale problem is that while the @@ -685,7 +692,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, stream); // bring tdigest offsets back to the host - auto tdigest_offsets = lcv.offsets(); + auto tdigest_offsets = tdv.centroids().offsets(); std::vector h_inner_offsets(tdigest_offsets.size()); cudaMemcpyAsync(h_inner_offsets.data(), tdigest_offsets.begin(), @@ -696,7 +703,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, stream.synchronize(); // extract all means and weights into a table - cudf::table_view tdigests_unsliced({mean, weight}); + cudf::table_view tdigests_unsliced({tdv.means(), tdv.weights()}); // generate the merged (but not yet compressed) tdigests for each group. std::vector> tdigests; @@ -727,30 +734,59 @@ std::unique_ptr group_merge_tdigest(column_view const& input, }); // generate min and max values - auto min_col = scv.child(cudf::detail::tdigest::min_column_index); auto merged_min_col = cudf::make_fixed_width_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); + auto min_iter = thrust::make_transform_iterator( + thrust::make_zip_iterator(thrust::make_tuple(tdv.min_begin(), tdv.size_begin())), + tdigest_min{}); thrust::reduce_by_key(rmm::exec_policy(stream), group_labels.begin(), group_labels.end(), - min_col.begin(), + min_iter, thrust::make_discard_iterator(), merged_min_col->mutable_view().begin(), thrust::equal_to{}, // key equality check thrust::minimum{}); - auto max_col = scv.child(cudf::detail::tdigest::max_column_index); auto merged_max_col = cudf::make_fixed_width_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); + auto max_iter = thrust::make_transform_iterator( + thrust::make_zip_iterator(thrust::make_tuple(tdv.max_begin(), tdv.size_begin())), + tdigest_max{}); thrust::reduce_by_key(rmm::exec_policy(stream), group_labels.begin(), group_labels.end(), - max_col.begin(), + max_iter, thrust::make_discard_iterator(), merged_max_col->mutable_view().begin(), thrust::equal_to{}, // key equality check thrust::maximum{}); + // for any empty groups, set the min and max to be 0. not technically necessary but it makes + // testing simpler. + auto group_num_weights = cudf::detail::make_counting_transform_iterator( + 0, + [outer_offsets = group_offsets.data(), + inner_offsets = + tdigest_offsets.begin()] __device__(size_type group_index) -> size_type { + auto const tdigest_begin = outer_offsets[group_index]; + auto const tdigest_end = outer_offsets[group_index + 1]; + return inner_offsets[tdigest_end] - inner_offsets[tdigest_begin]; + }); + auto group_is_empty = [] __device__(size_type group_size) { return group_size == 0; }; + thrust::replace_if(rmm::exec_policy(stream), + merged_min_col->mutable_view().begin(), + merged_min_col->mutable_view().end(), + group_num_weights, + group_is_empty, + 0); + thrust::replace_if(rmm::exec_policy(stream), + merged_max_col->mutable_view().begin(), + merged_max_col->mutable_view().end(), + group_num_weights, + group_is_empty, + 0); + // concatenate all the merged tdigests back into one table. std::vector tdigest_views; tdigest_views.reserve(num_groups); @@ -761,7 +797,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, auto merged = cudf::detail::concatenate(tdigest_views, stream, mr); // generate cumulative weights - auto merged_weights = merged->get_column(cudf::detail::tdigest::weight_column_index).view(); + auto merged_weights = merged->get_column(1).view(); auto cumulative_weights = cudf::make_fixed_width_column( data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED); auto keys = cudf::detail::make_counting_transform_iterator( @@ -791,9 +827,12 @@ std::unique_ptr group_merge_tdigest(column_view const& input, [outer_offsets = group_offsets.data(), inner_offsets = tdigest_offsets.begin(), cumulative_weights = - cumulative_weights->view().begin()] __device__(size_type group_index) { + cumulative_weights->view().begin()] __device__(size_type group_index) -> double { + // if there's no weights in this group of digests at all, return 0. + auto const num_weights = + inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]]; auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1; - return cumulative_weights[last_weight_index]; + return num_weights == 0 ? 0 : cumulative_weights[last_weight_index]; }); auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, @@ -813,9 +852,8 @@ std::unique_ptr group_merge_tdigest(column_view const& input, // input centroid values auto centroids = cudf::detail::make_counting_transform_iterator( 0, - make_weighted_centroid{ - merged->get_column(cudf::detail::tdigest::mean_column_index).view().begin(), - merged_weights.begin()}); + make_weighted_centroid{merged->get_column(0).view().begin(), + merged_weights.begin()}); // compute the tdigest return compute_tdigests(delta, diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu index 9aea59a195b..2af7a5e5cde 100644 --- a/cpp/src/quantiles/tdigest/tdigest.cu +++ b/cpp/src/quantiles/tdigest/tdigest.cu @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include @@ -28,6 +28,8 @@ #include +using namespace cudf::tdigest; + namespace cudf { namespace detail { namespace tdigest { @@ -166,27 +168,20 @@ __global__ void compute_percentiles_kernel(device_span tdiges * * @returns Column of doubles containing requested percentile values. */ -std::unique_ptr compute_approx_percentiles(structs_column_view const& input, +std::unique_ptr compute_approx_percentiles(tdigest_column_view const& input, column_view const& percentiles, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - lists_column_view lcv(input.child(centroid_column_index)); - column_view min_col = input.child(min_column_index); - column_view max_col = input.child(max_column_index); + tdigest_column_view tdv(input); // offsets, representing the size of each tdigest - auto offsets = lcv.offsets(); - - // extract means and weights - auto data = lcv.parent().child(lists_column_view::child_column_index); - structs_column_view tdigest(data); - auto mean = tdigest.child(mean_column_index); - auto weight = tdigest.child(weight_column_index); + auto offsets = tdv.centroids().offsets(); // compute summed weights + auto weight = tdv.weights(); auto cumulative_weights = cudf::make_fixed_width_column(data_type{type_id::FLOAT64}, - mean.size(), + weight.size(), mask_state::UNALLOCATED, stream, rmm::mr::get_current_device_resource()); @@ -225,7 +220,7 @@ std::unique_ptr compute_approx_percentiles(structs_column_view const& in data_type{type_id::FLOAT64}, num_output_values, std::move(null_mask), null_count, stream, mr); auto centroids = cudf::detail::make_counting_transform_iterator( - 0, make_centroid{mean.begin(), weight.begin()}); + 0, make_centroid{tdv.means().begin(), tdv.weights().begin()}); constexpr size_type block_size = 256; cudf::detail::grid_1d const grid(percentiles.size() * input.size(), block_size); @@ -233,60 +228,61 @@ std::unique_ptr compute_approx_percentiles(structs_column_view const& in {offsets.begin(), static_cast(offsets.size())}, *percentiles_cdv, centroids, - min_col.begin(), - max_col.begin(), + tdv.min_begin(), + tdv.max_begin(), cumulative_weights->view().begin(), result->mutable_view().begin()); return result; } -void check_is_valid_tdigest_column(column_view const& col) +std::unique_ptr make_tdigest_column(size_type num_rows, + std::unique_ptr&& centroid_means, + std::unique_ptr&& centroid_weights, + std::unique_ptr&& tdigest_offsets, + std::unique_ptr&& min_values, + std::unique_ptr&& max_values, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - // sanity check that this is actually tdigest data - CUDF_EXPECTS(col.type().id() == type_id::STRUCT, "Encountered invalid tdigest column"); - CUDF_EXPECTS(col.size() > 0, "tdigest columns must have > 0 rows"); - CUDF_EXPECTS(col.offset() == 0, "Encountered a sliced tdigest column"); - CUDF_EXPECTS(col.nullable() == false, "Encountered nullable tdigest column"); - - structs_column_view scv(col); - CUDF_EXPECTS(scv.num_children() == 3, "Encountered invalid tdigest column"); - CUDF_EXPECTS(scv.child(min_column_index).type().id() == type_id::FLOAT64, - "Encountered invalid tdigest column"); - CUDF_EXPECTS(scv.child(max_column_index).type().id() == type_id::FLOAT64, - "Encountered invalid tdigest column"); - - lists_column_view lcv(scv.child(centroid_column_index)); - auto data = lcv.child(); - CUDF_EXPECTS(data.type().id() == type_id::STRUCT, "Encountered invalid tdigest column"); - CUDF_EXPECTS(data.num_children() == 2, - "Encountered tdigest column with an invalid number of children"); - auto mean = data.child(mean_column_index); - CUDF_EXPECTS(mean.type().id() == type_id::FLOAT64, "Encountered invalid tdigest mean column"); - auto weight = data.child(weight_column_index); - CUDF_EXPECTS(weight.type().id() == type_id::FLOAT64, "Encountered invalid tdigest weight column"); + CUDF_EXPECTS(tdigest_offsets->size() == num_rows + 1, + "Encountered unexpected offset count in make_tdigest_column"); + CUDF_EXPECTS(centroid_means->size() == centroid_weights->size(), + "Encountered unexpected centroid size mismatch in make_tdigest_column"); + CUDF_EXPECTS(min_values->size() == num_rows, + "Encountered unexpected min value count in make_tdigest_column"); + CUDF_EXPECTS(max_values->size() == num_rows, + "Encountered unexpected max value count in make_tdigest_column"); + + // inner struct column + auto const centroids_size = centroid_means->size(); + std::vector> inner_children; + inner_children.push_back(std::move(centroid_means)); + inner_children.push_back(std::move(centroid_weights)); + auto tdigest_data = + cudf::make_structs_column(centroids_size, std::move(inner_children), 0, {}, stream, mr); + + // grouped into lists + auto tdigest = + cudf::make_lists_column(num_rows, std::move(tdigest_offsets), std::move(tdigest_data), 0, {}); + + // create the final column + std::vector> children; + children.push_back(std::move(tdigest)); + children.push_back(std::move(min_values)); + children.push_back(std::move(max_values)); + return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr); } std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - // mean/weight columns - std::vector> inner_children; - inner_children.push_back(make_empty_column(data_type(type_id::FLOAT64))); - inner_children.push_back(make_empty_column(data_type(type_id::FLOAT64))); - auto offsets = cudf::make_fixed_width_column( data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), offsets->mutable_view().begin(), offsets->mutable_view().end(), 0); - auto list = - make_lists_column(1, - std::move(offsets), - cudf::make_structs_column(0, std::move(inner_children), 0, {}, stream, mr), - 0, - {}); auto min_col = cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); @@ -301,22 +297,24 @@ std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, max_col->mutable_view().end(), 0); - std::vector> children; - children.push_back(std::move(list)); - children.push_back(std::move(min_col)); - children.push_back(std::move(max_col)); - - return make_structs_column(1, std::move(children), 0, {}, stream, mr); + return make_tdigest_column(1, + make_empty_column(data_type(type_id::FLOAT64)), + make_empty_column(data_type(type_id::FLOAT64)), + std::move(offsets), + std::move(min_col), + std::move(max_col), + stream, + mr); } } // namespace tdigest. -std::unique_ptr percentile_approx(structs_column_view const& input, +std::unique_ptr percentile_approx(tdigest_column_view const& input, column_view const& percentiles, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - tdigest::check_is_valid_tdigest_column(input); + tdigest_column_view tdv(input); CUDF_EXPECTS(percentiles.type().id() == type_id::FLOAT64, "percentile_approx expects float64 percentile inputs"); @@ -341,24 +339,20 @@ std::unique_ptr percentile_approx(structs_column_view const& input, // if any of the input digests are empty, nullify the corresponding output rows (values will be // uninitialized) - auto [bitmask, null_count] = [stream, mr, input]() { - lists_column_view lcv(input.child(tdigest::centroid_column_index)); - auto iter = cudf::detail::make_counting_transform_iterator( - 0, [offsets = lcv.offsets().begin()] __device__(size_type index) { - return offsets[index + 1] - offsets[index] == 0 ? 1 : 0; - }); - auto const null_count = thrust::reduce(rmm::exec_policy(stream), iter, iter + input.size(), 0); + auto [bitmask, null_count] = [stream, mr, &tdv]() { + auto tdigest_is_empty = thrust::make_transform_iterator( + tdv.size_begin(), + [] __device__(size_type tdigest_size) -> size_type { return tdigest_size == 0; }); + auto const null_count = + thrust::reduce(rmm::exec_policy(stream), tdigest_is_empty, tdigest_is_empty + tdv.size(), 0); if (null_count == 0) { return std::pair{rmm::device_buffer{}, null_count}; } - return cudf::detail::valid_if( - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(0) + input.size(), - [offsets = lcv.offsets().begin()] __device__(size_type index) { - return offsets[index + 1] - offsets[index] == 0 ? 0 : 1; - }, - stream, - mr); + return cudf::detail::valid_if(tdigest_is_empty, + tdigest_is_empty + tdv.size(), + thrust::logical_not{}, + stream, + mr); }(); return cudf::make_lists_column( @@ -373,7 +367,7 @@ std::unique_ptr percentile_approx(structs_column_view const& input, } // namespace detail -std::unique_ptr percentile_approx(structs_column_view const& input, +std::unique_ptr percentile_approx(tdigest_column_view const& input, column_view const& percentiles, rmm::mr::device_memory_resource* mr) { diff --git a/cpp/src/quantiles/tdigest/tdigest_column_view.cpp b/cpp/src/quantiles/tdigest/tdigest_column_view.cpp new file mode 100644 index 00000000000..a86b40fd64a --- /dev/null +++ b/cpp/src/quantiles/tdigest/tdigest_column_view.cpp @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +namespace cudf { +namespace tdigest { + +using namespace cudf; + +tdigest_column_view::tdigest_column_view(column_view const& col) : column_view(col) +{ + // sanity check that this is actually tdigest data + CUDF_EXPECTS(col.type().id() == type_id::STRUCT, "Encountered invalid tdigest column"); + CUDF_EXPECTS(col.size() > 0, "tdigest columns must have > 0 rows"); + CUDF_EXPECTS(col.offset() == 0, "Encountered a sliced tdigest column"); + CUDF_EXPECTS(col.nullable() == false, "Encountered nullable tdigest column"); + + structs_column_view scv(col); + CUDF_EXPECTS(scv.num_children() == 3, "Encountered invalid tdigest column"); + CUDF_EXPECTS(scv.child(min_column_index).type().id() == type_id::FLOAT64, + "Encountered invalid tdigest column"); + CUDF_EXPECTS(scv.child(max_column_index).type().id() == type_id::FLOAT64, + "Encountered invalid tdigest column"); + + lists_column_view lcv(scv.child(centroid_column_index)); + auto data = lcv.child(); + CUDF_EXPECTS(data.type().id() == type_id::STRUCT, "Encountered invalid tdigest column"); + CUDF_EXPECTS(data.num_children() == 2, + "Encountered tdigest column with an invalid number of children"); + auto mean = data.child(mean_column_index); + CUDF_EXPECTS(mean.type().id() == type_id::FLOAT64, "Encountered invalid tdigest mean column"); + auto weight = data.child(weight_column_index); + CUDF_EXPECTS(weight.type().id() == type_id::FLOAT64, "Encountered invalid tdigest weight column"); +} + +lists_column_view tdigest_column_view::centroids() const { return child(centroid_column_index); } + +column_view tdigest_column_view::means() const +{ + auto c = centroids(); + structs_column_view inner(c.parent().child(lists_column_view::child_column_index)); + return inner.child(mean_column_index); +} + +column_view tdigest_column_view::weights() const +{ + auto c = centroids(); + structs_column_view inner(c.parent().child(lists_column_view::child_column_index)); + return inner.child(weight_column_index); +} + +double const* tdigest_column_view::min_begin() const +{ + return child(min_column_index).begin(); +} + +double const* tdigest_column_view::max_begin() const +{ + return child(max_column_index).begin(); +} + +} // namespace tdigest +} // namespace cudf diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index 818999867c1..2cac931f7f5 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include @@ -44,6 +44,22 @@ struct TDigestAllTypes : public cudf::test::BaseFixture { }; TYPED_TEST_CASE(TDigestAllTypes, cudf::test::NumericTypes); +template +struct column_min { + __device__ double operator()(device_span vals) + { + return static_cast(*thrust::min_element(thrust::seq, vals.begin(), vals.end())); + } +}; + +template +struct column_max { + __device__ double operator()(device_span vals) + { + return static_cast(*thrust::max_element(thrust::seq, vals.begin(), vals.end())); + } +}; + struct tdigest_gen { template < typename T, @@ -69,15 +85,11 @@ struct tdigest_gen { } }; -void tdigest_sample_compare(column_view const& result, +void tdigest_sample_compare(cudf::tdigest::tdigest_column_view const& tdv, std::vector const& h_expected) { - cudf::detail::tdigest::check_is_valid_tdigest_column(result); - cudf::structs_column_view scv(result); - cudf::lists_column_view lcv(scv.child(cudf::detail::tdigest::centroid_column_index)); - cudf::structs_column_view tdigests(lcv.child()); - column_view result_mean = tdigests.child(cudf::detail::tdigest::mean_column_index); - column_view result_weight = tdigests.child(cudf::detail::tdigest::weight_column_index); + column_view result_mean = tdv.means(); + column_view result_weight = tdv.weights(); auto expected_mean = cudf::make_fixed_width_column( data_type{type_id::FLOAT64}, h_expected.size(), mask_state::UNALLOCATED); @@ -114,45 +126,95 @@ void tdigest_sample_compare(column_view const& result, } template -std::unique_ptr make_expected_tdigest(column_view const& mean, - column_view const& weight, - T min, - T max) +void tdigest_minmax_compare(cudf::tdigest::tdigest_column_view const& tdv, + column_view const& input_values) { - std::vector> inner_children; - inner_children.push_back(std::make_unique(mean)); - inner_children.push_back(std::make_unique(weight)); - // tdigest struct - auto tdigests = cudf::make_structs_column(mean.size(), std::move(inner_children), 0, {}); - - std::vector h_offsets{0, mean.size()}; - auto offsets = - cudf::make_fixed_width_column(data_type{type_id::INT32}, 2, mask_state::UNALLOCATED); - cudaMemcpy(offsets->mutable_view().begin(), - h_offsets.data(), - sizeof(offset_type) * 2, - cudaMemcpyHostToDevice); - - auto list = cudf::make_lists_column(1, std::move(offsets), std::move(tdigests), 0, {}); - - auto min_col = - cudf::make_fixed_width_column(data_type{type_id::FLOAT64}, 1, mask_state::UNALLOCATED); - thrust::fill(rmm::exec_policy(rmm::cuda_stream_default), - min_col->mutable_view().begin(), - min_col->mutable_view().end(), - static_cast(min)); - auto max_col = - cudf::make_fixed_width_column(data_type{type_id::FLOAT64}, 1, mask_state::UNALLOCATED); - thrust::fill(rmm::exec_policy(rmm::cuda_stream_default), - max_col->mutable_view().begin(), - max_col->mutable_view().end(), - static_cast(max)); - - std::vector> children; - children.push_back(std::move(list)); - children.push_back(std::move(min_col)); - children.push_back(std::move(max_col)); - return make_structs_column(1, std::move(children), 0, {}); + // verify min/max + thrust::host_vector> h_spans; + h_spans.push_back({input_values.begin(), static_cast(input_values.size())}); + thrust::device_vector> spans(h_spans); + + auto expected_min = cudf::make_fixed_width_column( + data_type{type_id::FLOAT64}, spans.size(), mask_state::UNALLOCATED); + thrust::transform(rmm::exec_policy(rmm::cuda_stream_default), + spans.begin(), + spans.end(), + expected_min->mutable_view().template begin(), + column_min{}); + column_view result_min(data_type{type_id::FLOAT64}, tdv.size(), tdv.min_begin()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result_min, *expected_min); + + auto expected_max = cudf::make_fixed_width_column( + data_type{type_id::FLOAT64}, spans.size(), mask_state::UNALLOCATED); + thrust::transform(rmm::exec_policy(rmm::cuda_stream_default), + spans.begin(), + spans.end(), + expected_max->mutable_view().template begin(), + column_max{}); + column_view result_max(data_type{type_id::FLOAT64}, tdv.size(), tdv.max_begin()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result_max, *expected_max); +} + +struct expected_tdigest { + column_view mean; + column_view weight; + double min, max; +}; + +std::unique_ptr make_expected_tdigest_column(std::vector const& groups) +{ + std::vector> tdigests; + + // make an individual digest + auto make_digest = [&](expected_tdigest const& tdigest) { + std::vector> inner_children; + inner_children.push_back(std::make_unique(tdigest.mean)); + inner_children.push_back(std::make_unique(tdigest.weight)); + // tdigest struct + auto tdigests = + cudf::make_structs_column(tdigest.mean.size(), std::move(inner_children), 0, {}); + + std::vector h_offsets{0, tdigest.mean.size()}; + auto offsets = + cudf::make_fixed_width_column(data_type{type_id::INT32}, 2, mask_state::UNALLOCATED); + cudaMemcpy(offsets->mutable_view().begin(), + h_offsets.data(), + sizeof(offset_type) * 2, + cudaMemcpyHostToDevice); + + auto list = cudf::make_lists_column(1, std::move(offsets), std::move(tdigests), 0, {}); + + auto min_col = + cudf::make_fixed_width_column(data_type{type_id::FLOAT64}, 1, mask_state::UNALLOCATED); + thrust::fill(rmm::exec_policy(rmm::cuda_stream_default), + min_col->mutable_view().begin(), + min_col->mutable_view().end(), + tdigest.min); + auto max_col = + cudf::make_fixed_width_column(data_type{type_id::FLOAT64}, 1, mask_state::UNALLOCATED); + thrust::fill(rmm::exec_policy(rmm::cuda_stream_default), + max_col->mutable_view().begin(), + max_col->mutable_view().end(), + tdigest.max); + + std::vector> children; + children.push_back(std::move(list)); + children.push_back(std::move(min_col)); + children.push_back(std::move(max_col)); + return make_structs_column(1, std::move(children), 0, {}); + }; + + // build the individual digests + std::transform(groups.begin(), groups.end(), std::back_inserter(tdigests), make_digest); + + // concatenate them + std::vector views; + std::transform(tdigests.begin(), + tdigests.end(), + std::back_inserter(views), + [](std::unique_ptr const& c) { return c->view(); }); + + return cudf::concatenate(views); } TYPED_TEST(TDigestAllTypes, Simple) @@ -172,7 +234,10 @@ TYPED_TEST(TDigestAllTypes, Simple) auto mean = cudf::cast(raw_mean, data_type{type_id::FLOAT64}); double const min = 1; double const max = 126; - auto expected = make_expected_tdigest(*mean, weight, static_cast(min), static_cast(max)); + auto expected = make_expected_tdigest_column({{*mean, + weight, + static_cast(static_cast(min)), + static_cast(static_cast(max))}}); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } @@ -195,7 +260,10 @@ TYPED_TEST(TDigestAllTypes, SimpleWithNulls) auto mean = cudf::cast(raw_mean, data_type{type_id::FLOAT64}); double const min = 1; double const max = 122; - auto expected = make_expected_tdigest(*mean, weight, static_cast(min), static_cast(max)); + auto expected = make_expected_tdigest_column({{*mean, + weight, + static_cast(static_cast(min)), + static_cast(static_cast(max))}}); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } @@ -288,6 +356,30 @@ TYPED_TEST(TDigestAllTypes, LargeGroups) struct TDigestTest : public cudf::test::BaseFixture { }; +TEST_F(TDigestTest, EmptyMixed) +{ + cudf::test::fixed_width_column_wrapper values{{123456.78, 10.0, 20.0, 30.0}, + {1, 0, 1, 0}}; + cudf::test::strings_column_wrapper keys{"b", "a", "c", "d"}; + + auto const delta = 1000; + cudf::table_view t({keys}); + cudf::groupby::groupby gb(t); + std::vector requests; + std::vector> aggregations; + aggregations.push_back(cudf::make_tdigest_aggregation(delta)); + requests.push_back({values, std::move(aggregations)}); + auto result = gb.aggregate(requests); + + using FCW = cudf::test::fixed_width_column_wrapper; + auto expected = make_expected_tdigest_column({{FCW{}, FCW{}, 0, 0}, + {FCW{123456.78}, FCW{1.0}, 123456.78, 123456.78}, + {FCW{20.0}, FCW{1.0}, 20.0, 20.0}, + {FCW{}, FCW{}, 0, 0}}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result.second[0].results[0], *expected); +} + TEST_F(TDigestTest, LargeInputDouble) { // these tests are being done explicitly because of the way we have to precompute the correct @@ -322,8 +414,12 @@ TEST_F(TDigestTest, LargeInputDouble) {409, 94.07685720279611985006, 1272}, {491, 99.94197663121231300920, 130}, {500, 99.99969880795092080916, 2}}; + cudf::tdigest::tdigest_column_view tdv(*result); + + tdigest_sample_compare(tdv, expected); - tdigest_sample_compare(*result, expected); + // verify min/max + tdigest_minmax_compare(tdv, *values); } // delta 100 @@ -338,8 +434,12 @@ TEST_F(TDigestTest, LargeInputDouble) {38, 90.61229683516096145013, 15581}, {46, 99.07283498858802772702, 5142}, {50, 99.99970905482754801596, 1}}; + cudf::tdigest::tdigest_column_view tdv(*result); + + tdigest_sample_compare(tdv, expected); - tdigest_sample_compare(*result, expected); + // verify min/max + tdigest_minmax_compare(tdv, *values); } // delta 10 @@ -353,8 +453,12 @@ TEST_F(TDigestTest, LargeInputDouble) {3, 83.46216572053654658703, 187500}, {4, 96.42204425201593664951, 71620}, {5, 99.99970905482754801596, 1}}; + cudf::tdigest::tdigest_column_view tdv(*result); - tdigest_sample_compare(*result, expected); + tdigest_sample_compare(tdv, expected); + + // verify min/max + tdigest_minmax_compare(tdv, *values); } } @@ -395,8 +499,12 @@ TEST_F(TDigestTest, LargeInputInt) {418, 95, 1157}, {479, 99, 307}, {500, 99, 2}}; + cudf::tdigest::tdigest_column_view tdv(*result); + + tdigest_sample_compare(tdv, expected); - tdigest_sample_compare(*result, expected); + // verify min/max + tdigest_minmax_compare(tdv, *values); } // delta 100 @@ -411,8 +519,12 @@ TEST_F(TDigestTest, LargeInputInt) {38, 90.14209614273795523332, 15581}, {46, 98.64041229093737683797, 5142}, {50, 99, 1}}; + cudf::tdigest::tdigest_column_view tdv(*result); - tdigest_sample_compare(*result, expected); + tdigest_sample_compare(tdv, expected); + + // verify min/max + tdigest_minmax_compare(tdv, *values); } // delta 10 @@ -426,8 +538,12 @@ TEST_F(TDigestTest, LargeInputInt) {3, 82.96355733333332693746, 187500}, {4, 95.91280368612116546956, 71620}, {5, 99, 1}}; + cudf::tdigest::tdigest_column_view tdv(*result); + + tdigest_sample_compare(tdv, expected); - tdigest_sample_compare(*result, expected); + // verify min/max + tdigest_minmax_compare(tdv, *values); } } @@ -441,6 +557,7 @@ TEST_F(TDigestTest, LargeInputDecimal) // decimal, int, bool) auto values = generate_standardized_percentile_distribution(data_type{type_id::DECIMAL32, -4}); + auto cast_values = cudf::cast(*values, data_type{type_id::FLOAT64}); // all in the same group auto keys = cudf::make_fixed_width_column( data_type{type_id::INT32}, values->size(), mask_state::UNALLOCATED); @@ -465,8 +582,12 @@ TEST_F(TDigestTest, LargeInputDecimal) {409, 94.07680636792450457051, 1272}, {491, 99.94192461538463589932, 130}, {500, 99.99965000000000259206, 2}}; + cudf::tdigest::tdigest_column_view tdv(*result); - tdigest_sample_compare(*result, expected); + tdigest_sample_compare(tdv, expected); + + // verify min/max + tdigest_minmax_compare(tdv, *cast_values); } // delta 100 @@ -481,8 +602,12 @@ TEST_F(TDigestTest, LargeInputDecimal) {38, 90.61224673640975879607, 15581}, {46, 99.07278498638662256326, 5142}, {50, 99.99970000000000425189, 1}}; + cudf::tdigest::tdigest_column_view tdv(*result); + + tdigest_sample_compare(tdv, expected); - tdigest_sample_compare(*result, expected); + // verify min/max + tdigest_minmax_compare(tdv, *cast_values); } // delta 10 @@ -496,8 +621,12 @@ TEST_F(TDigestTest, LargeInputDecimal) {3, 83.46211575573336460820, 187500}, {4, 96.42199425300195514410, 71620}, {5, 99.99970000000000425189, 1}}; + cudf::tdigest::tdigest_column_view tdv(*result); - tdigest_sample_compare(*result, expected); + tdigest_sample_compare(tdv, expected); + + // verify min/max + tdigest_minmax_compare(tdv, *cast_values); } } @@ -562,6 +691,9 @@ TEST_F(TDigestMergeTest, Simple) requests.push_back({*merge_input, std::move(aggregations)}); auto result = gb.aggregate(requests); + cudf::tdigest::tdigest_column_view tdv(*result.second[0].results[0]); + + // verify centroids std::vector expected{{0, 0.00013945158577498588, 2}, {10, 0.04804393446447510763, 50}, {59, 1.68846964439246893797, 284}, @@ -575,10 +707,278 @@ TEST_F(TDigestMergeTest, Simple) {625, 98.20470345147104751504, 405}, {700, 99.96818381983835877236, 56}, {711, 99.99970905482754801596, 1}}; + tdigest_sample_compare(tdv, expected); + + // verify min/max + tdigest_minmax_compare(tdv, *values); + } +} + +struct key_groups { + __device__ size_type operator()(size_type i) { return i < 250000 ? 0 : 1; } +}; +TEST_F(TDigestMergeTest, Grouped) +{ + auto values = generate_standardized_percentile_distribution(data_type{type_id::FLOAT64}); + CUDF_EXPECTS(values->size() == 750000, "Unexpected distribution size"); + // all in the same group + auto keys = cudf::make_fixed_width_column( + data_type{type_id::INT32}, values->size(), mask_state::UNALLOCATED); + // 3 groups. 0-250000 in group 0. 250000-500000 in group 1 and 500000-750000 in group 1 + auto key_iter = cudf::detail::make_counting_transform_iterator(0, key_groups{}); + thrust::copy(rmm::exec_policy(rmm::cuda_stream_default), + key_iter, + key_iter + keys->size(), + keys->mutable_view().template begin()); + + auto split_values = cudf::split(*values, {250000, 500000}); + auto grouped_split_values = cudf::split(*values, {250000}); + auto split_keys = cudf::split(*keys, {250000, 500000}); + + int const delta = 1000; + + // generate seperate digests + std::vector> parts; + auto iter = thrust::make_counting_iterator(0); + std::transform( + iter, + iter + split_values.size(), + std::back_inserter(parts), + [&split_keys, &split_values, delta](int i) { + cudf::table_view t({split_keys[i]}); + cudf::groupby::groupby gb(t); + std::vector requests; + std::vector> aggregations; + aggregations.push_back(cudf::make_tdigest_aggregation(delta)); + requests.push_back({split_values[i], std::move(aggregations)}); + auto result = gb.aggregate(requests); + return std::move(result.second[0].results[0]); + }); + std::vector part_views; + std::transform(parts.begin(), + parts.end(), + std::back_inserter(part_views), + [](std::unique_ptr const& col) { return col->view(); }); + + // merge delta = 1000 + { + int const merge_delta = 1000; + + // merge them + auto merge_input = cudf::concatenate(part_views); + cudf::test::fixed_width_column_wrapper merge_keys{0, 1, 1}; + cudf::table_view key_table({merge_keys}); + cudf::groupby::groupby gb(key_table); + std::vector requests; + std::vector> aggregations; + aggregations.push_back( + cudf::make_merge_tdigest_aggregation(merge_delta)); + requests.push_back({*merge_input, std::move(aggregations)}); + auto result = gb.aggregate(requests); + + CUDF_EXPECTS(result.second[0].results[0]->size() == 2, "Unexpected tdigest merge result size"); + cudf::tdigest::tdigest_column_view tdv(*result.second[0].results[0]); + + // verify centroids + std::vector expected{// group 0 + {0, 0.00013945158577498588, 2}, + {10, 0.04804393446447509375, 50}, + {66, 2.10089484962640948851, 316}, + {139, 8.92977366346101852912, 601}, + {243, 23.89152910016953867967, 784}, + {366, 41.62636569363655780762, 586}, + {432, 47.73085102980330418632, 326}, + {460, 49.20637897385523018556, 196}, + {501, 49.99998311512171511595, 1}, + // group 1 + {502 + 0, 50.00022508669655252334, 2}, + {502 + 15, 50.05415694538910287292, 74}, + {502 + 70, 51.21421484112906341579, 334}, + {502 + 150, 55.19367617848146778670, 635}, + {502 + 260, 63.24605285552920008740, 783}, + {502 + 380, 76.99522005804017510400, 1289}, + {502 + 440, 84.22673817294192133431, 758}, + {502 + 490, 88.11787981529532487457, 784}, + {502 + 555, 93.02766411136053648079, 704}, + {502 + 618, 96.91486035315536184953, 516}, + {502 + 710, 99.87755861436669135855, 110}, + {502 + 733, 99.99970905482754801596, 1}}; + tdigest_sample_compare(tdv, expected); + + // verify min/max + auto split_results = cudf::split(*result.second[0].results[0], {1}); + auto iter = thrust::make_counting_iterator(0); + std::for_each(iter, iter + split_results.size(), [&](size_type i) { + auto copied = std::make_unique(split_results[i]); + tdigest_minmax_compare(cudf::tdigest::tdigest_column_view(*copied), + grouped_split_values[i]); + }); + } + + // merge delta = 100 + { + int const merge_delta = 100; + + // merge them + auto merge_input = cudf::concatenate(part_views); + cudf::test::fixed_width_column_wrapper merge_keys{0, 1, 1}; + cudf::table_view key_table({merge_keys}); + cudf::groupby::groupby gb(key_table); + std::vector requests; + std::vector> aggregations; + aggregations.push_back( + cudf::make_merge_tdigest_aggregation(merge_delta)); + requests.push_back({*merge_input, std::move(aggregations)}); + auto result = gb.aggregate(requests); + + CUDF_EXPECTS(result.second[0].results[0]->size() == 2, "Unexpected tdigest merge result size"); + cudf::tdigest::tdigest_column_view tdv(*result.second[0].results[0]); + + // verify centroids + std::vector expected{// group 0 + {0, 0.02182479870203561656, 231}, + {3, 0.60625795002234528219, 1688}, + {13, 8.40462931740497687372, 5867}, + {27, 28.79997783486397722186, 7757}, + {35, 40.22391421196020644402, 6224}, + {45, 48.96506331299028857984, 2225}, + {50, 49.99979491345574444949, 4}, + // group 1 + {51 + 0, 50.02171921312970681583, 460}, + {51 + 5, 51.45308398121498072442, 5074}, + {51 + 11, 55.96880716301625113829, 10011}, + {51 + 22, 70.18029861315150697010, 15351}, + {51 + 38, 92.65943436519887654867, 10718}, + {51 + 47, 99.27745505225347244505, 3639}}; + tdigest_sample_compare(tdv, expected); + + // verify min/max + auto split_results = cudf::split(*result.second[0].results[0], {1}); + auto iter = thrust::make_counting_iterator(0); + std::for_each(iter, iter + split_results.size(), [&](size_type i) { + auto copied = std::make_unique(split_results[i]); + tdigest_minmax_compare(cudf::tdigest::tdigest_column_view(*copied), + grouped_split_values[i]); + }); + } + + // merge delta = 10 + { + int const merge_delta = 10; + + // merge them + auto merge_input = cudf::concatenate(part_views); + cudf::test::fixed_width_column_wrapper merge_keys{0, 1, 1}; + cudf::table_view key_table({merge_keys}); + cudf::groupby::groupby gb(key_table); + std::vector requests; + std::vector> aggregations; + aggregations.push_back( + cudf::make_merge_tdigest_aggregation(merge_delta)); + requests.push_back({*merge_input, std::move(aggregations)}); + auto result = gb.aggregate(requests); - tdigest_sample_compare(*result.second[0].results[0], expected); + CUDF_EXPECTS(result.second[0].results[0]->size() == 2, "Unexpected tdigest merge result size"); + cudf::tdigest::tdigest_column_view tdv(*result.second[0].results[0]); + + // verify centroids + std::vector expected{// group 0 + {0, 2.34644806683495144028, 23623}, + {1, 10.95523693698660672169, 62290}, + {2, 24.90731657803452847588, 77208}, + {3, 38.88062495289155862110, 62658}, + {4, 47.56288303840698006297, 24217}, + {5, 49.99979491345574444949, 4}, + // group 1 + {6 + 0, 52.40174463129091719793, 47410}, + {6 + 1, 60.97025126481504031517, 124564}, + {6 + 2, 74.91722742839780835311, 154387}, + {6 + 3, 88.87559489177009197647, 124810}, + {6 + 4, 97.55823307073454486726, 48817}, + {6 + 5, 99.99901807905750672489, 12}}; + tdigest_sample_compare(tdv, expected); + + // verify min/max + auto split_results = cudf::split(*result.second[0].results[0], {1}); + auto iter = thrust::make_counting_iterator(0); + std::for_each(iter, iter + split_results.size(), [&](size_type i) { + auto copied = std::make_unique(split_results[i]); + tdigest_minmax_compare(cudf::tdigest::tdigest_column_view(*copied), + grouped_split_values[i]); + }); } } +TEST_F(TDigestMergeTest, Empty) +{ + // 3 empty tdigests all in the same group + auto a = cudf::detail::tdigest::make_empty_tdigest_column(); + auto b = cudf::detail::tdigest::make_empty_tdigest_column(); + auto c = cudf::detail::tdigest::make_empty_tdigest_column(); + std::vector cols; + cols.push_back(*a); + cols.push_back(*b); + cols.push_back(*c); + auto values = cudf::concatenate(cols); + cudf::test::fixed_width_column_wrapper keys{0, 0, 0}; + + auto const delta = 1000; + cudf::table_view t({keys}); + cudf::groupby::groupby gb(t); + std::vector requests; + std::vector> aggregations; + aggregations.push_back(cudf::make_merge_tdigest_aggregation(delta)); + requests.push_back({*values, std::move(aggregations)}); + auto result = gb.aggregate(requests); + + auto expected = cudf::detail::tdigest::make_empty_tdigest_column(); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); +} + +TEST_F(TDigestMergeTest, EmptyGroups) +{ + cudf::test::fixed_width_column_wrapper values_b{126, 15, 1, 99, 67, 55, 2}; + cudf::test::fixed_width_column_wrapper values_d{100, 200, 300, 400, 500, 600, 700}; + cudf::test::fixed_width_column_wrapper keys{0, 0, 0, 0, 0, 0, 0}; + int const delta = 1000; + + auto a = cudf::detail::tdigest::make_empty_tdigest_column(); + auto b = cudf::type_dispatcher( + static_cast(values_b).type(), tdigest_gen{}, keys, values_b, delta); + auto c = cudf::detail::tdigest::make_empty_tdigest_column(); + auto d = cudf::type_dispatcher( + static_cast(values_d).type(), tdigest_gen{}, keys, values_d, delta); + auto e = cudf::detail::tdigest::make_empty_tdigest_column(); + + std::vector cols; + cols.push_back(*a); + cols.push_back(*b); + cols.push_back(*c); + cols.push_back(*d); + cols.push_back(*e); + auto values = cudf::concatenate(cols); + + cudf::test::fixed_width_column_wrapper merge_keys{0, 0, 1, 0, 2}; + + cudf::table_view t({merge_keys}); + cudf::groupby::groupby gb(t); + std::vector requests; + std::vector> aggregations; + aggregations.push_back(cudf::make_merge_tdigest_aggregation(delta)); + requests.push_back({*values, std::move(aggregations)}); + auto result = gb.aggregate(requests); + + using FCW = cudf::test::fixed_width_column_wrapper; + cudf::test::fixed_width_column_wrapper expected_means{ + 1, 2, 15, 55, 67, 99, 100, 126, 200, 300, 400, 500, 600, 700}; + cudf::test::fixed_width_column_wrapper expected_weights{ + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; + auto expected = make_expected_tdigest_column( + {{expected_means, expected_weights, 1, 700}, {FCW{}, FCW{}, 0, 0}, {FCW{}, FCW{}, 0, 0}}); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); +} + } // namespace test } // namespace cudf diff --git a/cpp/tests/quantiles/percentile_approx_test.cu b/cpp/tests/quantiles/percentile_approx_test.cu index 2b19699d870..ec3390fdfe5 100644 --- a/cpp/tests/quantiles/percentile_approx_test.cu +++ b/cpp/tests/quantiles/percentile_approx_test.cu @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,7 @@ #include using namespace cudf; +using namespace cudf::tdigest; struct tdigest_gen { template < @@ -116,8 +118,8 @@ struct percentile_approx_dispatch { cudf::test::fixed_width_column_wrapper g_percentages(percentages.begin(), percentages.end()); - structs_column_view scv(*(gb_result.second[0].results[0])); - auto result = cudf::percentile_approx(scv, g_percentages); + tdigest_column_view tdv(*(gb_result.second[0].results[0])); + auto result = cudf::percentile_approx(tdv, g_percentages); cudf::test::expect_columns_equivalent( *expected, *result, cudf::test::debug_output_level::FIRST_ERROR, ulps); @@ -194,8 +196,8 @@ void percentile_approx_test(column_view const& _keys, cudf::test::fixed_width_column_wrapper g_percentages(percentages.begin(), percentages.end()); - structs_column_view scv(*(gb_result.second[0].results[0])); - auto result = cudf::percentile_approx(scv, g_percentages); + tdigest_column_view tdv(*(gb_result.second[0].results[0])); + auto result = cudf::percentile_approx(tdv, g_percentages); CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*expected, *result); } @@ -364,8 +366,8 @@ TEST_F(PercentileApproxTest, EmptyInput) input.push_back(*empty_); auto empty = cudf::concatenate(input); - structs_column_view scv(*empty); - auto result = cudf::percentile_approx(scv, percentiles); + tdigest_column_view tdv(*empty); + auto result = cudf::percentile_approx(tdv, percentiles); cudf::test::fixed_width_column_wrapper offsets{0, 0, 0, 0}; std::vector nulls{0, 0, 0}; @@ -395,8 +397,8 @@ TEST_F(PercentileApproxTest, EmptyPercentiles) cudf::test::fixed_width_column_wrapper percentiles{}; - structs_column_view scv(*tdigest_column.second[0].results[0]); - auto result = cudf::percentile_approx(scv, percentiles); + tdigest_column_view tdv(*tdigest_column.second[0].results[0]); + auto result = cudf::percentile_approx(tdv, percentiles); cudf::test::fixed_width_column_wrapper offsets{0, 0, 0}; auto expected = cudf::make_lists_column(2, @@ -422,10 +424,10 @@ TEST_F(PercentileApproxTest, NullPercentiles) requests.push_back({values, std::move(aggregations)}); auto tdigest_column = gb.aggregate(requests); - structs_column_view scv(*tdigest_column.second[0].results[0]); + tdigest_column_view tdv(*tdigest_column.second[0].results[0]); cudf::test::fixed_width_column_wrapper npercentiles{{0.5, 0.5, 1.0, 1.0}, {0, 0, 1, 1}}; - auto result = cudf::percentile_approx(scv, npercentiles); + auto result = cudf::percentile_approx(tdv, npercentiles); std::vector valids{0, 0, 1, 1}; cudf::test::lists_column_wrapper expected{{{99, 99, 4, 4}, valids.begin()}, diff --git a/java/src/main/native/src/ColumnViewJni.cpp b/java/src/main/native/src/ColumnViewJni.cpp index cd5ff073edd..f95b05d5aeb 100644 --- a/java/src/main/native/src/ColumnViewJni.cpp +++ b/java/src/main/native/src/ColumnViewJni.cpp @@ -61,7 +61,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -295,14 +297,13 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ColumnView_approxPercentile(JNIEnv * JNI_NULL_CHECK(env, input_column, "input_column native handle is null", 0); JNI_NULL_CHECK(env, percentiles_column, "percentiles_column native handle is null", 0); try { - cudf::jni::auto_set_device(env); - cudf::column_view *n_input_column = reinterpret_cast(input_column); - std::unique_ptr input_view = - std::make_unique(*n_input_column); - cudf::column_view *n_percentiles_column = - reinterpret_cast(percentiles_column); - std::unique_ptr result = - cudf::percentile_approx(*input_view, *n_percentiles_column); + using namespace cudf; + using tdigest_column_view = cudf::tdigest::tdigest_column_view; + jni::auto_set_device(env); + auto const tdigest_view = + tdigest_column_view{structs_column_view{*reinterpret_cast(input_column)}}; + auto const p_percentiles = reinterpret_cast(percentiles_column); + auto result = percentile_approx(tdigest_view, *p_percentiles); return reinterpret_cast(result.release()); } CATCH_STD(env, 0);