Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue with percentile_approx where output tdigests could have uninitialized data at the end. #9931

Merged
merged 3 commits into from
Jan 6, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 92 additions & 62 deletions cpp/src/groupby/sort/group_tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,14 @@ struct merge_centroids {
* nearest whole number <= it is floor(3.56) == 3.
*/
struct nearest_value_scalar_weights {
thrust::pair<double, int> operator() __device__(double next_limit, size_type)
offset_type const* group_offsets;

thrust::pair<double, int> operator() __device__(double next_limit, size_type group_index)
{
double const f = floor(next_limit);
return {f, max(0, static_cast<int>(next_limit) - 1)};
double const f = floor(next_limit);
auto const relative_weight_index = max(0, static_cast<int>(next_limit) - 1);
auto const group_size = group_offsets[group_index + 1] - group_offsets[group_index];
return {f, relative_weight_index < group_size ? relative_weight_index : group_size - 1};
}
};

Expand Down Expand Up @@ -136,7 +140,8 @@ struct nearest_value_centroid_weights {
group_cumulative_weights);

return index == 0 ? thrust::pair<double, int>{0, 0}
: thrust::pair<double, int>{group_cumulative_weights[index - 1], index - 1};
: thrust::pair<double, int>{group_cumulative_weights[index - 1],
static_cast<int>(index) - 1};
}
};

Expand Down Expand Up @@ -187,6 +192,39 @@ struct cumulative_centroid_weight {
}
};

// retrieve group info of scalar inputs by group index
struct scalar_group_info {
size_type const* group_valid_counts;
offset_type const* group_offsets;

__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index)
{
return {static_cast<double>(group_valid_counts[group_index]),
group_offsets[group_index + 1] - group_offsets[group_index],
group_offsets[group_index]};
}
};

// retrieve group info of centroid inputs by group index
struct centroid_group_info {
double const* cumulative_weights;
offset_type const* outer_offsets;
offset_type const* inner_offsets;

__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index)
{
// if there's no weights in this group of digests at all, return 0.
auto const group_start = inner_offsets[outer_offsets[group_index]];
auto const group_end = inner_offsets[outer_offsets[group_index + 1]];
auto const num_weights = group_end - group_start;
auto const last_weight_index = group_end - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: You could inline this so that you online calculate it when the calculation is false. Should be negligible though since I assume num_weights is almost always nonzero and you're already avoiding actually indexing into cumulative_weights except when needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure the compiler will optimize this correctly, and I think group_end - 1 is a bit on the cryptic side. This code is already drilling through 2 layers of offsets which is confusing to begin with :)

return num_weights == 0
? thrust::tuple<double, size_type, size_type>{0, num_weights, group_start}
: thrust::tuple<double, size_type, size_type>{
cumulative_weights[last_weight_index], num_weights, group_start};
}
};

struct tdigest_min {
__device__ double operator()(thrust::tuple<double, size_type> const& t)
{
Expand Down Expand Up @@ -231,37 +269,40 @@ __device__ double scale_func_k1(double quantile, double delta_norm)
* cluster sizes and total # of clusters, and once to compute the actual
* weight limits per cluster.
*
* @param delta_ tdigest compression level
* @param delta tdigest compression level
* @param num_groups The number of input groups
* @param nearest_weight_ A functor which returns the nearest weight in the input
* @param nearest_weight A functor which returns the nearest weight in the input
* stream that falls before our current cluster limit
* @param total_weight_ A functor which returns the expected total weight for
* the entire stream of input values for the specified group.
* @param group_info A functor which returns the info for the specified group (total
* weight, size and start offset)
* @param group_cluster_wl Output. The set of cluster weight limits for each group.
* @param group_num_clusters Output. The number of output clusters for each input group.
* @param group_cluster_offsets Offsets per-group to the start of it's clusters
* @param has_nulls Whether or not the input contains nulls
*
*/
template <typename TotalWeightIter, typename NearestWeightFunc, typename CumulativeWeight>
__global__ void generate_cluster_limits_kernel(int delta_,

template <typename GroupInfo, typename NearestWeightFunc, typename CumulativeWeight>
__global__ void generate_cluster_limits_kernel(int delta,
size_type num_groups,
NearestWeightFunc nearest_weight,
TotalWeightIter total_weight_,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
double* group_cluster_wl,
size_type* group_num_clusters,
offset_type const* group_cluster_offsets,
bool has_nulls)
{
int const tid = threadIdx.x + blockIdx.x * blockDim.x;
int const tid = threadIdx.x + blockIdx.x * blockDim.x;

auto const group_index = tid;
if (group_index >= num_groups) { return; }

// we will generate at most delta clusters.
double const delta = static_cast<double>(delta_);
double const delta_norm = delta / (2.0 * M_PI);
double const total_weight = total_weight_[group_index];
double const delta_norm = static_cast<double>(delta) / (2.0 * M_PI);
double total_weight;
size_type group_size, group_start;
thrust::tie(total_weight, group_size, group_start) = group_info(group_index);
Comment on lines +303 to +305
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you need a tie here because of thrust::tuple not supporting structured bindings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Same as

// NOTE: can't use structured bindings here.


// start at the correct place based on our cluster offset.
double* cluster_wl =
Expand All @@ -281,11 +322,11 @@ __global__ void generate_cluster_limits_kernel(int delta_,
double cur_limit = 0.0;
double cur_weight = 0.0;
double next_limit = -1.0;
int last_inserted_index = -1;
int last_inserted_index = -1; // group-relative index into the input stream

// compute the first cluster limit
double nearest_w;
int nearest_w_index;
int nearest_w_index; // group-relative index into the input stream
while (1) {
cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w);
if (cur_weight >= total_weight) { break; }
Expand Down Expand Up @@ -331,12 +372,19 @@ __global__ void generate_cluster_limits_kernel(int delta_,
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
if (nearest_w_index == last_inserted_index || last_inserted_index < 0) {
nearest_w_index = last_inserted_index + 1;
auto [r, i, adjusted] = cumulative_weight(nearest_w_index);
adjusted_next_limit = max(next_limit, adjusted);
(void)r;
(void)i;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
nearest_w_index =
(last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1;

// the "adjusted" weight must be high enough so that this value will fall in the bucket.
// NOTE: cumulative_weight expects an absolute index into the input value stream, not a
// group-relative index
[[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted);
}
cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit;
last_inserted_index = nearest_w_index;
Expand All @@ -360,21 +408,21 @@ __global__ void generate_cluster_limits_kernel(int delta_,
* @param num_groups The number of input groups
* @param nearest_weight A functor which returns the nearest weight in the input
* stream that falls before our current cluster limit
* @param total_weight A functor which returns the expected total weight for
* the entire stream of input values for the specified group.
* @param group_info A functor which returns the info for the specified group (total weight,
* size and start offset)
* @param has_nulls Whether or not the input data contains nulls
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
* @returns A tuple containing the set of cluster weight limits for each group, a set of
* list-style offsets indicating group sizes, and the total number of clusters
*/
template <typename TotalWeightIter, typename NearestWeight, typename CumulativeWeight>
template <typename GroupInfo, typename NearestWeight, typename CumulativeWeight>
std::tuple<rmm::device_uvector<double>, std::unique_ptr<column>, size_type>
generate_group_cluster_info(int delta,
size_type num_groups,
NearestWeight nearest_weight,
TotalWeightIter total_weight,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
bool has_nulls,
rmm::cuda_stream_view stream,
Expand All @@ -390,7 +438,7 @@ generate_group_cluster_info(int delta,
delta,
num_groups,
nearest_weight,
total_weight,
group_info,
cumulative_weight,
nullptr,
group_num_clusters.begin(),
Expand Down Expand Up @@ -420,7 +468,7 @@ generate_group_cluster_info(int delta,
delta,
num_groups,
nearest_weight,
total_weight,
group_info,
cumulative_weight,
group_cluster_wl.begin(),
group_num_clusters.begin(),
Expand Down Expand Up @@ -583,9 +631,8 @@ std::unique_ptr<column> compute_tdigests(int delta,
group_cluster_offsets = group_cluster_offsets->view().begin<offset_type>(),
group_cumulative_weight] __device__(size_type value_index) -> size_type {
// get group index, relative value index within the group and cumulative weight.
auto [group_index, relative_value_index, cumulative_weight] =
[[maybe_unused]] auto [group_index, relative_value_index, cumulative_weight] =
group_cumulative_weight(value_index);
(void)relative_value_index;

auto const num_clusters =
group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index];
Expand Down Expand Up @@ -616,8 +663,9 @@ std::unique_ptr<column> compute_tdigests(int delta,
cudf::mutable_column_view weight_col(*centroid_weights);

// reduce the centroids into the clusters
auto output = thrust::make_zip_iterator(thrust::make_tuple(
auto output = thrust::make_zip_iterator(thrust::make_tuple(
mean_col.begin<double>(), weight_col.begin<double>(), thrust::make_discard_iterator()));

auto const num_values = std::distance(centroids_begin, centroids_end);
thrust::reduce_by_key(rmm::exec_policy(stream),
keys,
Expand All @@ -640,12 +688,6 @@ std::unique_ptr<column> compute_tdigests(int delta,
mr);
}

// retrieve total weight of scalar inputs by group index
struct scalar_total_weight {
size_type const* group_valid_counts;
__device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; }
};

// return the min/max value of scalar inputs by group index
template <typename T>
struct get_scalar_minmax {
Expand Down Expand Up @@ -678,17 +720,15 @@ struct typed_group_tdigest {
rmm::mr::device_memory_resource* mr)
{
// first, generate cluster weight information for each input group
auto total_weight = cudf::detail::make_counting_transform_iterator(
0, scalar_total_weight{group_valid_counts.begin()});
auto [group_cluster_wl, group_cluster_offsets, total_clusters] =
generate_group_cluster_info(delta,
num_groups,
nearest_value_scalar_weights{},
total_weight,
cumulative_scalar_weight{group_offsets, group_labels},
col.null_count() > 0,
stream,
mr);
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
num_groups,
nearest_value_scalar_weights{group_offsets.begin()},
scalar_group_info{group_valid_counts.begin(), group_offsets.begin()},
cumulative_scalar_weight{group_offsets, group_labels},
col.null_count() > 0,
stream,
mr);

// device column view. handy because the .element() function
// automatically handles fixed-point conversions for us
Expand Down Expand Up @@ -927,25 +967,15 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
auto const delta = max_centroids;

// generate cluster info
auto total_group_weight = cudf::detail::make_counting_transform_iterator(
0,
[outer_offsets = group_offsets.data(),
inner_offsets = tdigest_offsets.begin<size_type>(),
cumulative_weights =
cumulative_weights->view().begin<double>()] __device__(size_type group_index) -> double {
// if there's no weights in this group of digests at all, return 0.
auto const num_weights =
inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]];
auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1;
return num_weights == 0 ? 0 : cumulative_weights[last_weight_index];
});
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
num_groups,
nearest_value_centroid_weights{cumulative_weights->view().begin<double>(),
group_offsets.data(),
tdigest_offsets.begin<size_type>()},
total_group_weight,
centroid_group_info{cumulative_weights->view().begin<double>(),
group_offsets.data(),
tdigest_offsets.begin<size_type>()},
cumulative_centroid_weight{
cumulative_weights->view().begin<double>(),
group_labels,
Expand Down