Skip to content

Commit

Permalink
Fixed issue with percentile_approx where output tdigests could have u…
Browse files Browse the repository at this point in the history
…ninitialized data at the end. (#9931)

Fixes NVIDIA/spark-rapids#4060

Issue was relatively straightforward.  There is a section of code in the bucket generation step that detects "gaps" that would be generated during the reduction step.   It was incorrectly indexing into the list of cumulative weights for input values.  Fundamental change was to change the `TotalWeightIter` iterator which was just returning the total weight for an input group into a `GroupInfoFunc` functor that returns total weight as well as group size info that is used to index cumulative weights correctly.

Authors:
  - https://github.com/nvdbaranec

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #9931
  • Loading branch information
nvdbaranec authored Jan 6, 2022
1 parent 7392f9f commit 120aa62
Showing 1 changed file with 92 additions and 62 deletions.
154 changes: 92 additions & 62 deletions cpp/src/groupby/sort/group_tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,14 @@ struct merge_centroids {
* nearest whole number <= it is floor(3.56) == 3.
*/
struct nearest_value_scalar_weights {
thrust::pair<double, int> operator() __device__(double next_limit, size_type)
offset_type const* group_offsets;

thrust::pair<double, int> operator() __device__(double next_limit, size_type group_index)
{
double const f = floor(next_limit);
return {f, max(0, static_cast<int>(next_limit) - 1)};
double const f = floor(next_limit);
auto const relative_weight_index = max(0, static_cast<int>(next_limit) - 1);
auto const group_size = group_offsets[group_index + 1] - group_offsets[group_index];
return {f, relative_weight_index < group_size ? relative_weight_index : group_size - 1};
}
};

Expand Down Expand Up @@ -136,7 +140,8 @@ struct nearest_value_centroid_weights {
group_cumulative_weights);

return index == 0 ? thrust::pair<double, int>{0, 0}
: thrust::pair<double, int>{group_cumulative_weights[index - 1], index - 1};
: thrust::pair<double, int>{group_cumulative_weights[index - 1],
static_cast<int>(index) - 1};
}
};

Expand Down Expand Up @@ -187,6 +192,39 @@ struct cumulative_centroid_weight {
}
};

// retrieve group info of scalar inputs by group index
struct scalar_group_info {
size_type const* group_valid_counts;
offset_type const* group_offsets;

__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index)
{
return {static_cast<double>(group_valid_counts[group_index]),
group_offsets[group_index + 1] - group_offsets[group_index],
group_offsets[group_index]};
}
};

// retrieve group info of centroid inputs by group index
struct centroid_group_info {
double const* cumulative_weights;
offset_type const* outer_offsets;
offset_type const* inner_offsets;

__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index)
{
// if there's no weights in this group of digests at all, return 0.
auto const group_start = inner_offsets[outer_offsets[group_index]];
auto const group_end = inner_offsets[outer_offsets[group_index + 1]];
auto const num_weights = group_end - group_start;
auto const last_weight_index = group_end - 1;
return num_weights == 0
? thrust::tuple<double, size_type, size_type>{0, num_weights, group_start}
: thrust::tuple<double, size_type, size_type>{
cumulative_weights[last_weight_index], num_weights, group_start};
}
};

struct tdigest_min {
__device__ double operator()(thrust::tuple<double, size_type> const& t)
{
Expand Down Expand Up @@ -231,37 +269,40 @@ __device__ double scale_func_k1(double quantile, double delta_norm)
* cluster sizes and total # of clusters, and once to compute the actual
* weight limits per cluster.
*
* @param delta_ tdigest compression level
* @param delta tdigest compression level
* @param num_groups The number of input groups
* @param nearest_weight_ A functor which returns the nearest weight in the input
* @param nearest_weight A functor which returns the nearest weight in the input
* stream that falls before our current cluster limit
* @param total_weight_ A functor which returns the expected total weight for
* the entire stream of input values for the specified group.
* @param group_info A functor which returns the info for the specified group (total
* weight, size and start offset)
* @param group_cluster_wl Output. The set of cluster weight limits for each group.
* @param group_num_clusters Output. The number of output clusters for each input group.
* @param group_cluster_offsets Offsets per-group to the start of it's clusters
* @param has_nulls Whether or not the input contains nulls
*
*/
template <typename TotalWeightIter, typename NearestWeightFunc, typename CumulativeWeight>
__global__ void generate_cluster_limits_kernel(int delta_,

template <typename GroupInfo, typename NearestWeightFunc, typename CumulativeWeight>
__global__ void generate_cluster_limits_kernel(int delta,
size_type num_groups,
NearestWeightFunc nearest_weight,
TotalWeightIter total_weight_,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
double* group_cluster_wl,
size_type* group_num_clusters,
offset_type const* group_cluster_offsets,
bool has_nulls)
{
int const tid = threadIdx.x + blockIdx.x * blockDim.x;
int const tid = threadIdx.x + blockIdx.x * blockDim.x;

auto const group_index = tid;
if (group_index >= num_groups) { return; }

// we will generate at most delta clusters.
double const delta = static_cast<double>(delta_);
double const delta_norm = delta / (2.0 * M_PI);
double const total_weight = total_weight_[group_index];
double const delta_norm = static_cast<double>(delta) / (2.0 * M_PI);
double total_weight;
size_type group_size, group_start;
thrust::tie(total_weight, group_size, group_start) = group_info(group_index);

// start at the correct place based on our cluster offset.
double* cluster_wl =
Expand All @@ -281,11 +322,11 @@ __global__ void generate_cluster_limits_kernel(int delta_,
double cur_limit = 0.0;
double cur_weight = 0.0;
double next_limit = -1.0;
int last_inserted_index = -1;
int last_inserted_index = -1; // group-relative index into the input stream

// compute the first cluster limit
double nearest_w;
int nearest_w_index;
int nearest_w_index; // group-relative index into the input stream
while (1) {
cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w);
if (cur_weight >= total_weight) { break; }
Expand Down Expand Up @@ -331,12 +372,19 @@ __global__ void generate_cluster_limits_kernel(int delta_,
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
if (nearest_w_index == last_inserted_index || last_inserted_index < 0) {
nearest_w_index = last_inserted_index + 1;
auto [r, i, adjusted] = cumulative_weight(nearest_w_index);
adjusted_next_limit = max(next_limit, adjusted);
(void)r;
(void)i;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
nearest_w_index =
(last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1;

// the "adjusted" weight must be high enough so that this value will fall in the bucket.
// NOTE: cumulative_weight expects an absolute index into the input value stream, not a
// group-relative index
[[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted);
}
cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit;
last_inserted_index = nearest_w_index;
Expand All @@ -360,21 +408,21 @@ __global__ void generate_cluster_limits_kernel(int delta_,
* @param num_groups The number of input groups
* @param nearest_weight A functor which returns the nearest weight in the input
* stream that falls before our current cluster limit
* @param total_weight A functor which returns the expected total weight for
* the entire stream of input values for the specified group.
* @param group_info A functor which returns the info for the specified group (total weight,
* size and start offset)
* @param has_nulls Whether or not the input data contains nulls
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
* @returns A tuple containing the set of cluster weight limits for each group, a set of
* list-style offsets indicating group sizes, and the total number of clusters
*/
template <typename TotalWeightIter, typename NearestWeight, typename CumulativeWeight>
template <typename GroupInfo, typename NearestWeight, typename CumulativeWeight>
std::tuple<rmm::device_uvector<double>, std::unique_ptr<column>, size_type>
generate_group_cluster_info(int delta,
size_type num_groups,
NearestWeight nearest_weight,
TotalWeightIter total_weight,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
bool has_nulls,
rmm::cuda_stream_view stream,
Expand All @@ -390,7 +438,7 @@ generate_group_cluster_info(int delta,
delta,
num_groups,
nearest_weight,
total_weight,
group_info,
cumulative_weight,
nullptr,
group_num_clusters.begin(),
Expand Down Expand Up @@ -420,7 +468,7 @@ generate_group_cluster_info(int delta,
delta,
num_groups,
nearest_weight,
total_weight,
group_info,
cumulative_weight,
group_cluster_wl.begin(),
group_num_clusters.begin(),
Expand Down Expand Up @@ -583,9 +631,8 @@ std::unique_ptr<column> compute_tdigests(int delta,
group_cluster_offsets = group_cluster_offsets->view().begin<offset_type>(),
group_cumulative_weight] __device__(size_type value_index) -> size_type {
// get group index, relative value index within the group and cumulative weight.
auto [group_index, relative_value_index, cumulative_weight] =
[[maybe_unused]] auto [group_index, relative_value_index, cumulative_weight] =
group_cumulative_weight(value_index);
(void)relative_value_index;

auto const num_clusters =
group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index];
Expand Down Expand Up @@ -616,8 +663,9 @@ std::unique_ptr<column> compute_tdigests(int delta,
cudf::mutable_column_view weight_col(*centroid_weights);

// reduce the centroids into the clusters
auto output = thrust::make_zip_iterator(thrust::make_tuple(
auto output = thrust::make_zip_iterator(thrust::make_tuple(
mean_col.begin<double>(), weight_col.begin<double>(), thrust::make_discard_iterator()));

auto const num_values = std::distance(centroids_begin, centroids_end);
thrust::reduce_by_key(rmm::exec_policy(stream),
keys,
Expand All @@ -640,12 +688,6 @@ std::unique_ptr<column> compute_tdigests(int delta,
mr);
}

// retrieve total weight of scalar inputs by group index
struct scalar_total_weight {
size_type const* group_valid_counts;
__device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; }
};

// return the min/max value of scalar inputs by group index
template <typename T>
struct get_scalar_minmax {
Expand Down Expand Up @@ -678,17 +720,15 @@ struct typed_group_tdigest {
rmm::mr::device_memory_resource* mr)
{
// first, generate cluster weight information for each input group
auto total_weight = cudf::detail::make_counting_transform_iterator(
0, scalar_total_weight{group_valid_counts.begin()});
auto [group_cluster_wl, group_cluster_offsets, total_clusters] =
generate_group_cluster_info(delta,
num_groups,
nearest_value_scalar_weights{},
total_weight,
cumulative_scalar_weight{group_offsets, group_labels},
col.null_count() > 0,
stream,
mr);
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
num_groups,
nearest_value_scalar_weights{group_offsets.begin()},
scalar_group_info{group_valid_counts.begin(), group_offsets.begin()},
cumulative_scalar_weight{group_offsets, group_labels},
col.null_count() > 0,
stream,
mr);

// device column view. handy because the .element() function
// automatically handles fixed-point conversions for us
Expand Down Expand Up @@ -927,25 +967,15 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
auto const delta = max_centroids;

// generate cluster info
auto total_group_weight = cudf::detail::make_counting_transform_iterator(
0,
[outer_offsets = group_offsets.data(),
inner_offsets = tdigest_offsets.begin<size_type>(),
cumulative_weights =
cumulative_weights->view().begin<double>()] __device__(size_type group_index) -> double {
// if there's no weights in this group of digests at all, return 0.
auto const num_weights =
inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]];
auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1;
return num_weights == 0 ? 0 : cumulative_weights[last_weight_index];
});
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
num_groups,
nearest_value_centroid_weights{cumulative_weights->view().begin<double>(),
group_offsets.data(),
tdigest_offsets.begin<size_type>()},
total_group_weight,
centroid_group_info{cumulative_weights->view().begin<double>(),
group_offsets.data(),
tdigest_offsets.begin<size_type>()},
cumulative_centroid_weight{
cumulative_weights->view().begin<double>(),
group_labels,
Expand Down

0 comments on commit 120aa62

Please sign in to comment.