diff --git a/cpp/src/groupby/sort/group_tdigest.cu b/cpp/src/groupby/sort/group_tdigest.cu index ecb18c09f9d..b7b45341ad2 100644 --- a/cpp/src/groupby/sort/group_tdigest.cu +++ b/cpp/src/groupby/sort/group_tdigest.cu @@ -101,10 +101,14 @@ struct merge_centroids { * nearest whole number <= it is floor(3.56) == 3. */ struct nearest_value_scalar_weights { - thrust::pair operator() __device__(double next_limit, size_type) + offset_type const* group_offsets; + + thrust::pair operator() __device__(double next_limit, size_type group_index) { - double const f = floor(next_limit); - return {f, max(0, static_cast(next_limit) - 1)}; + double const f = floor(next_limit); + auto const relative_weight_index = max(0, static_cast(next_limit) - 1); + auto const group_size = group_offsets[group_index + 1] - group_offsets[group_index]; + return {f, relative_weight_index < group_size ? relative_weight_index : group_size - 1}; } }; @@ -136,7 +140,8 @@ struct nearest_value_centroid_weights { group_cumulative_weights); return index == 0 ? thrust::pair{0, 0} - : thrust::pair{group_cumulative_weights[index - 1], index - 1}; + : thrust::pair{group_cumulative_weights[index - 1], + static_cast(index) - 1}; } }; @@ -187,6 +192,39 @@ struct cumulative_centroid_weight { } }; +// retrieve group info of scalar inputs by group index +struct scalar_group_info { + size_type const* group_valid_counts; + offset_type const* group_offsets; + + __device__ thrust::tuple operator()(size_type group_index) + { + return {static_cast(group_valid_counts[group_index]), + group_offsets[group_index + 1] - group_offsets[group_index], + group_offsets[group_index]}; + } +}; + +// retrieve group info of centroid inputs by group index +struct centroid_group_info { + double const* cumulative_weights; + offset_type const* outer_offsets; + offset_type const* inner_offsets; + + __device__ thrust::tuple operator()(size_type group_index) + { + // if there's no weights in this group of digests at all, return 0. + auto const group_start = inner_offsets[outer_offsets[group_index]]; + auto const group_end = inner_offsets[outer_offsets[group_index + 1]]; + auto const num_weights = group_end - group_start; + auto const last_weight_index = group_end - 1; + return num_weights == 0 + ? thrust::tuple{0, num_weights, group_start} + : thrust::tuple{ + cumulative_weights[last_weight_index], num_weights, group_start}; + } +}; + struct tdigest_min { __device__ double operator()(thrust::tuple const& t) { @@ -231,37 +269,40 @@ __device__ double scale_func_k1(double quantile, double delta_norm) * cluster sizes and total # of clusters, and once to compute the actual * weight limits per cluster. * - * @param delta_ tdigest compression level + * @param delta tdigest compression level * @param num_groups The number of input groups - * @param nearest_weight_ A functor which returns the nearest weight in the input + * @param nearest_weight A functor which returns the nearest weight in the input * stream that falls before our current cluster limit - * @param total_weight_ A functor which returns the expected total weight for - * the entire stream of input values for the specified group. + * @param group_info A functor which returns the info for the specified group (total + * weight, size and start offset) * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters * @param has_nulls Whether or not the input contains nulls * */ -template -__global__ void generate_cluster_limits_kernel(int delta_, + +template +__global__ void generate_cluster_limits_kernel(int delta, size_type num_groups, NearestWeightFunc nearest_weight, - TotalWeightIter total_weight_, + GroupInfo group_info, CumulativeWeight cumulative_weight, double* group_cluster_wl, size_type* group_num_clusters, offset_type const* group_cluster_offsets, bool has_nulls) { - int const tid = threadIdx.x + blockIdx.x * blockDim.x; + int const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const group_index = tid; if (group_index >= num_groups) { return; } // we will generate at most delta clusters. - double const delta = static_cast(delta_); - double const delta_norm = delta / (2.0 * M_PI); - double const total_weight = total_weight_[group_index]; + double const delta_norm = static_cast(delta) / (2.0 * M_PI); + double total_weight; + size_type group_size, group_start; + thrust::tie(total_weight, group_size, group_start) = group_info(group_index); // start at the correct place based on our cluster offset. double* cluster_wl = @@ -281,11 +322,11 @@ __global__ void generate_cluster_limits_kernel(int delta_, double cur_limit = 0.0; double cur_weight = 0.0; double next_limit = -1.0; - int last_inserted_index = -1; + int last_inserted_index = -1; // group-relative index into the input stream // compute the first cluster limit double nearest_w; - int nearest_w_index; + int nearest_w_index; // group-relative index into the input stream while (1) { cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w); if (cur_weight >= total_weight) { break; } @@ -331,12 +372,19 @@ __global__ void generate_cluster_limits_kernel(int delta_, // during the reduction step to be trivial. // double adjusted_next_limit = next_limit; - if (nearest_w_index == last_inserted_index || last_inserted_index < 0) { - nearest_w_index = last_inserted_index + 1; - auto [r, i, adjusted] = cumulative_weight(nearest_w_index); - adjusted_next_limit = max(next_limit, adjusted); - (void)r; - (void)i; + if ((last_inserted_index < 0) || // if we haven't inserted anything yet + (nearest_w_index == + last_inserted_index)) { // if we land in the same bucket as the previous cap + + // force the value into this bucket + nearest_w_index = + (last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1; + + // the "adjusted" weight must be high enough so that this value will fall in the bucket. + // NOTE: cumulative_weight expects an absolute index into the input value stream, not a + // group-relative index + [[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start); + adjusted_next_limit = max(next_limit, adjusted); } cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit; last_inserted_index = nearest_w_index; @@ -360,8 +408,8 @@ __global__ void generate_cluster_limits_kernel(int delta_, * @param num_groups The number of input groups * @param nearest_weight A functor which returns the nearest weight in the input * stream that falls before our current cluster limit - * @param total_weight A functor which returns the expected total weight for - * the entire stream of input values for the specified group. + * @param group_info A functor which returns the info for the specified group (total weight, + * size and start offset) * @param has_nulls Whether or not the input data contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory @@ -369,12 +417,12 @@ __global__ void generate_cluster_limits_kernel(int delta_, * @returns A tuple containing the set of cluster weight limits for each group, a set of * list-style offsets indicating group sizes, and the total number of clusters */ -template +template std::tuple, std::unique_ptr, size_type> generate_group_cluster_info(int delta, size_type num_groups, NearestWeight nearest_weight, - TotalWeightIter total_weight, + GroupInfo group_info, CumulativeWeight cumulative_weight, bool has_nulls, rmm::cuda_stream_view stream, @@ -390,7 +438,7 @@ generate_group_cluster_info(int delta, delta, num_groups, nearest_weight, - total_weight, + group_info, cumulative_weight, nullptr, group_num_clusters.begin(), @@ -420,7 +468,7 @@ generate_group_cluster_info(int delta, delta, num_groups, nearest_weight, - total_weight, + group_info, cumulative_weight, group_cluster_wl.begin(), group_num_clusters.begin(), @@ -583,9 +631,8 @@ std::unique_ptr compute_tdigests(int delta, group_cluster_offsets = group_cluster_offsets->view().begin(), group_cumulative_weight] __device__(size_type value_index) -> size_type { // get group index, relative value index within the group and cumulative weight. - auto [group_index, relative_value_index, cumulative_weight] = + [[maybe_unused]] auto [group_index, relative_value_index, cumulative_weight] = group_cumulative_weight(value_index); - (void)relative_value_index; auto const num_clusters = group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index]; @@ -616,8 +663,9 @@ std::unique_ptr compute_tdigests(int delta, cudf::mutable_column_view weight_col(*centroid_weights); // reduce the centroids into the clusters - auto output = thrust::make_zip_iterator(thrust::make_tuple( + auto output = thrust::make_zip_iterator(thrust::make_tuple( mean_col.begin(), weight_col.begin(), thrust::make_discard_iterator())); + auto const num_values = std::distance(centroids_begin, centroids_end); thrust::reduce_by_key(rmm::exec_policy(stream), keys, @@ -640,12 +688,6 @@ std::unique_ptr compute_tdigests(int delta, mr); } -// retrieve total weight of scalar inputs by group index -struct scalar_total_weight { - size_type const* group_valid_counts; - __device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; } -}; - // return the min/max value of scalar inputs by group index template struct get_scalar_minmax { @@ -678,17 +720,15 @@ struct typed_group_tdigest { rmm::mr::device_memory_resource* mr) { // first, generate cluster weight information for each input group - auto total_weight = cudf::detail::make_counting_transform_iterator( - 0, scalar_total_weight{group_valid_counts.begin()}); - auto [group_cluster_wl, group_cluster_offsets, total_clusters] = - generate_group_cluster_info(delta, - num_groups, - nearest_value_scalar_weights{}, - total_weight, - cumulative_scalar_weight{group_offsets, group_labels}, - col.null_count() > 0, - stream, - mr); + auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( + delta, + num_groups, + nearest_value_scalar_weights{group_offsets.begin()}, + scalar_group_info{group_valid_counts.begin(), group_offsets.begin()}, + cumulative_scalar_weight{group_offsets, group_labels}, + col.null_count() > 0, + stream, + mr); // device column view. handy because the .element() function // automatically handles fixed-point conversions for us @@ -927,25 +967,15 @@ std::unique_ptr group_merge_tdigest(column_view const& input, auto const delta = max_centroids; // generate cluster info - auto total_group_weight = cudf::detail::make_counting_transform_iterator( - 0, - [outer_offsets = group_offsets.data(), - inner_offsets = tdigest_offsets.begin(), - cumulative_weights = - cumulative_weights->view().begin()] __device__(size_type group_index) -> double { - // if there's no weights in this group of digests at all, return 0. - auto const num_weights = - inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]]; - auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1; - return num_weights == 0 ? 0 : cumulative_weights[last_weight_index]; - }); auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, num_groups, nearest_value_centroid_weights{cumulative_weights->view().begin(), group_offsets.data(), tdigest_offsets.begin()}, - total_group_weight, + centroid_group_info{cumulative_weights->view().begin(), + group_offsets.data(), + tdigest_offsets.begin()}, cumulative_centroid_weight{ cumulative_weights->view().begin(), group_labels,