From 120aa62decc7a23919a9b669dbb49f63a698b47d Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Thu, 6 Jan 2022 16:37:31 -0600 Subject: [PATCH] Fixed issue with percentile_approx where output tdigests could have uninitialized data at the end. (#9931) Fixes https://github.com/NVIDIA/spark-rapids/issues/4060 Issue was relatively straightforward. There is a section of code in the bucket generation step that detects "gaps" that would be generated during the reduction step. It was incorrectly indexing into the list of cumulative weights for input values. Fundamental change was to change the `TotalWeightIter` iterator which was just returning the total weight for an input group into a `GroupInfoFunc` functor that returns total weight as well as group size info that is used to index cumulative weights correctly. Authors: - https://github.com/nvdbaranec Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Jake Hemstad (https://github.com/jrhemstad) URL: https://github.com/rapidsai/cudf/pull/9931 --- cpp/src/groupby/sort/group_tdigest.cu | 154 +++++++++++++++----------- 1 file changed, 92 insertions(+), 62 deletions(-) diff --git a/cpp/src/groupby/sort/group_tdigest.cu b/cpp/src/groupby/sort/group_tdigest.cu index ecb18c09f9d..b7b45341ad2 100644 --- a/cpp/src/groupby/sort/group_tdigest.cu +++ b/cpp/src/groupby/sort/group_tdigest.cu @@ -101,10 +101,14 @@ struct merge_centroids { * nearest whole number <= it is floor(3.56) == 3. */ struct nearest_value_scalar_weights { - thrust::pair operator() __device__(double next_limit, size_type) + offset_type const* group_offsets; + + thrust::pair operator() __device__(double next_limit, size_type group_index) { - double const f = floor(next_limit); - return {f, max(0, static_cast(next_limit) - 1)}; + double const f = floor(next_limit); + auto const relative_weight_index = max(0, static_cast(next_limit) - 1); + auto const group_size = group_offsets[group_index + 1] - group_offsets[group_index]; + return {f, relative_weight_index < group_size ? relative_weight_index : group_size - 1}; } }; @@ -136,7 +140,8 @@ struct nearest_value_centroid_weights { group_cumulative_weights); return index == 0 ? thrust::pair{0, 0} - : thrust::pair{group_cumulative_weights[index - 1], index - 1}; + : thrust::pair{group_cumulative_weights[index - 1], + static_cast(index) - 1}; } }; @@ -187,6 +192,39 @@ struct cumulative_centroid_weight { } }; +// retrieve group info of scalar inputs by group index +struct scalar_group_info { + size_type const* group_valid_counts; + offset_type const* group_offsets; + + __device__ thrust::tuple operator()(size_type group_index) + { + return {static_cast(group_valid_counts[group_index]), + group_offsets[group_index + 1] - group_offsets[group_index], + group_offsets[group_index]}; + } +}; + +// retrieve group info of centroid inputs by group index +struct centroid_group_info { + double const* cumulative_weights; + offset_type const* outer_offsets; + offset_type const* inner_offsets; + + __device__ thrust::tuple operator()(size_type group_index) + { + // if there's no weights in this group of digests at all, return 0. + auto const group_start = inner_offsets[outer_offsets[group_index]]; + auto const group_end = inner_offsets[outer_offsets[group_index + 1]]; + auto const num_weights = group_end - group_start; + auto const last_weight_index = group_end - 1; + return num_weights == 0 + ? thrust::tuple{0, num_weights, group_start} + : thrust::tuple{ + cumulative_weights[last_weight_index], num_weights, group_start}; + } +}; + struct tdigest_min { __device__ double operator()(thrust::tuple const& t) { @@ -231,37 +269,40 @@ __device__ double scale_func_k1(double quantile, double delta_norm) * cluster sizes and total # of clusters, and once to compute the actual * weight limits per cluster. * - * @param delta_ tdigest compression level + * @param delta tdigest compression level * @param num_groups The number of input groups - * @param nearest_weight_ A functor which returns the nearest weight in the input + * @param nearest_weight A functor which returns the nearest weight in the input * stream that falls before our current cluster limit - * @param total_weight_ A functor which returns the expected total weight for - * the entire stream of input values for the specified group. + * @param group_info A functor which returns the info for the specified group (total + * weight, size and start offset) * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters * @param has_nulls Whether or not the input contains nulls * */ -template -__global__ void generate_cluster_limits_kernel(int delta_, + +template +__global__ void generate_cluster_limits_kernel(int delta, size_type num_groups, NearestWeightFunc nearest_weight, - TotalWeightIter total_weight_, + GroupInfo group_info, CumulativeWeight cumulative_weight, double* group_cluster_wl, size_type* group_num_clusters, offset_type const* group_cluster_offsets, bool has_nulls) { - int const tid = threadIdx.x + blockIdx.x * blockDim.x; + int const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const group_index = tid; if (group_index >= num_groups) { return; } // we will generate at most delta clusters. - double const delta = static_cast(delta_); - double const delta_norm = delta / (2.0 * M_PI); - double const total_weight = total_weight_[group_index]; + double const delta_norm = static_cast(delta) / (2.0 * M_PI); + double total_weight; + size_type group_size, group_start; + thrust::tie(total_weight, group_size, group_start) = group_info(group_index); // start at the correct place based on our cluster offset. double* cluster_wl = @@ -281,11 +322,11 @@ __global__ void generate_cluster_limits_kernel(int delta_, double cur_limit = 0.0; double cur_weight = 0.0; double next_limit = -1.0; - int last_inserted_index = -1; + int last_inserted_index = -1; // group-relative index into the input stream // compute the first cluster limit double nearest_w; - int nearest_w_index; + int nearest_w_index; // group-relative index into the input stream while (1) { cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w); if (cur_weight >= total_weight) { break; } @@ -331,12 +372,19 @@ __global__ void generate_cluster_limits_kernel(int delta_, // during the reduction step to be trivial. // double adjusted_next_limit = next_limit; - if (nearest_w_index == last_inserted_index || last_inserted_index < 0) { - nearest_w_index = last_inserted_index + 1; - auto [r, i, adjusted] = cumulative_weight(nearest_w_index); - adjusted_next_limit = max(next_limit, adjusted); - (void)r; - (void)i; + if ((last_inserted_index < 0) || // if we haven't inserted anything yet + (nearest_w_index == + last_inserted_index)) { // if we land in the same bucket as the previous cap + + // force the value into this bucket + nearest_w_index = + (last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1; + + // the "adjusted" weight must be high enough so that this value will fall in the bucket. + // NOTE: cumulative_weight expects an absolute index into the input value stream, not a + // group-relative index + [[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start); + adjusted_next_limit = max(next_limit, adjusted); } cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit; last_inserted_index = nearest_w_index; @@ -360,8 +408,8 @@ __global__ void generate_cluster_limits_kernel(int delta_, * @param num_groups The number of input groups * @param nearest_weight A functor which returns the nearest weight in the input * stream that falls before our current cluster limit - * @param total_weight A functor which returns the expected total weight for - * the entire stream of input values for the specified group. + * @param group_info A functor which returns the info for the specified group (total weight, + * size and start offset) * @param has_nulls Whether or not the input data contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory @@ -369,12 +417,12 @@ __global__ void generate_cluster_limits_kernel(int delta_, * @returns A tuple containing the set of cluster weight limits for each group, a set of * list-style offsets indicating group sizes, and the total number of clusters */ -template +template std::tuple, std::unique_ptr, size_type> generate_group_cluster_info(int delta, size_type num_groups, NearestWeight nearest_weight, - TotalWeightIter total_weight, + GroupInfo group_info, CumulativeWeight cumulative_weight, bool has_nulls, rmm::cuda_stream_view stream, @@ -390,7 +438,7 @@ generate_group_cluster_info(int delta, delta, num_groups, nearest_weight, - total_weight, + group_info, cumulative_weight, nullptr, group_num_clusters.begin(), @@ -420,7 +468,7 @@ generate_group_cluster_info(int delta, delta, num_groups, nearest_weight, - total_weight, + group_info, cumulative_weight, group_cluster_wl.begin(), group_num_clusters.begin(), @@ -583,9 +631,8 @@ std::unique_ptr compute_tdigests(int delta, group_cluster_offsets = group_cluster_offsets->view().begin(), group_cumulative_weight] __device__(size_type value_index) -> size_type { // get group index, relative value index within the group and cumulative weight. - auto [group_index, relative_value_index, cumulative_weight] = + [[maybe_unused]] auto [group_index, relative_value_index, cumulative_weight] = group_cumulative_weight(value_index); - (void)relative_value_index; auto const num_clusters = group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index]; @@ -616,8 +663,9 @@ std::unique_ptr compute_tdigests(int delta, cudf::mutable_column_view weight_col(*centroid_weights); // reduce the centroids into the clusters - auto output = thrust::make_zip_iterator(thrust::make_tuple( + auto output = thrust::make_zip_iterator(thrust::make_tuple( mean_col.begin(), weight_col.begin(), thrust::make_discard_iterator())); + auto const num_values = std::distance(centroids_begin, centroids_end); thrust::reduce_by_key(rmm::exec_policy(stream), keys, @@ -640,12 +688,6 @@ std::unique_ptr compute_tdigests(int delta, mr); } -// retrieve total weight of scalar inputs by group index -struct scalar_total_weight { - size_type const* group_valid_counts; - __device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; } -}; - // return the min/max value of scalar inputs by group index template struct get_scalar_minmax { @@ -678,17 +720,15 @@ struct typed_group_tdigest { rmm::mr::device_memory_resource* mr) { // first, generate cluster weight information for each input group - auto total_weight = cudf::detail::make_counting_transform_iterator( - 0, scalar_total_weight{group_valid_counts.begin()}); - auto [group_cluster_wl, group_cluster_offsets, total_clusters] = - generate_group_cluster_info(delta, - num_groups, - nearest_value_scalar_weights{}, - total_weight, - cumulative_scalar_weight{group_offsets, group_labels}, - col.null_count() > 0, - stream, - mr); + auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( + delta, + num_groups, + nearest_value_scalar_weights{group_offsets.begin()}, + scalar_group_info{group_valid_counts.begin(), group_offsets.begin()}, + cumulative_scalar_weight{group_offsets, group_labels}, + col.null_count() > 0, + stream, + mr); // device column view. handy because the .element() function // automatically handles fixed-point conversions for us @@ -927,25 +967,15 @@ std::unique_ptr group_merge_tdigest(column_view const& input, auto const delta = max_centroids; // generate cluster info - auto total_group_weight = cudf::detail::make_counting_transform_iterator( - 0, - [outer_offsets = group_offsets.data(), - inner_offsets = tdigest_offsets.begin(), - cumulative_weights = - cumulative_weights->view().begin()] __device__(size_type group_index) -> double { - // if there's no weights in this group of digests at all, return 0. - auto const num_weights = - inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]]; - auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1; - return num_weights == 0 ? 0 : cumulative_weights[last_weight_index]; - }); auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, num_groups, nearest_value_centroid_weights{cumulative_weights->view().begin(), group_offsets.data(), tdigest_offsets.begin()}, - total_group_weight, + centroid_group_info{cumulative_weights->view().begin(), + group_offsets.data(), + tdigest_offsets.begin()}, cumulative_centroid_weight{ cumulative_weights->view().begin(), group_labels,