-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed issue with percentile_approx where output tdigests could have uninitialized data at the end. #9931
Fixed issue with percentile_approx where output tdigests could have uninitialized data at the end. #9931
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -101,10 +101,14 @@ struct merge_centroids { | |||
* nearest whole number <= it is floor(3.56) == 3. | ||||
*/ | ||||
struct nearest_value_scalar_weights { | ||||
thrust::pair<double, int> operator() __device__(double next_limit, size_type) | ||||
offset_type const* group_offsets; | ||||
|
||||
thrust::pair<double, int> operator() __device__(double next_limit, size_type group_index) | ||||
{ | ||||
double const f = floor(next_limit); | ||||
return {f, max(0, static_cast<int>(next_limit) - 1)}; | ||||
double const f = floor(next_limit); | ||||
auto const relative_weight_index = max(0, static_cast<int>(next_limit) - 1); | ||||
auto const group_size = group_offsets[group_index + 1] - group_offsets[group_index]; | ||||
return {f, relative_weight_index < group_size ? relative_weight_index : group_size - 1}; | ||||
} | ||||
}; | ||||
|
||||
|
@@ -136,7 +140,8 @@ struct nearest_value_centroid_weights { | |||
group_cumulative_weights); | ||||
|
||||
return index == 0 ? thrust::pair<double, int>{0, 0} | ||||
: thrust::pair<double, int>{group_cumulative_weights[index - 1], index - 1}; | ||||
: thrust::pair<double, int>{group_cumulative_weights[index - 1], | ||||
static_cast<int>(index) - 1}; | ||||
} | ||||
}; | ||||
|
||||
|
@@ -187,6 +192,39 @@ struct cumulative_centroid_weight { | |||
} | ||||
}; | ||||
|
||||
// retrieve group info of scalar inputs by group index | ||||
struct scalar_group_info { | ||||
size_type const* group_valid_counts; | ||||
offset_type const* group_offsets; | ||||
|
||||
__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index) | ||||
{ | ||||
return {static_cast<double>(group_valid_counts[group_index]), | ||||
group_offsets[group_index + 1] - group_offsets[group_index], | ||||
group_offsets[group_index]}; | ||||
} | ||||
}; | ||||
|
||||
// retrieve group info of centroid inputs by group index | ||||
struct centroid_group_info { | ||||
double const* cumulative_weights; | ||||
offset_type const* outer_offsets; | ||||
offset_type const* inner_offsets; | ||||
|
||||
__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index) | ||||
{ | ||||
// if there's no weights in this group of digests at all, return 0. | ||||
auto const group_start = inner_offsets[outer_offsets[group_index]]; | ||||
auto const group_end = inner_offsets[outer_offsets[group_index + 1]]; | ||||
auto const num_weights = group_end - group_start; | ||||
auto const last_weight_index = group_end - 1; | ||||
return num_weights == 0 | ||||
? thrust::tuple<double, size_type, size_type>{0, num_weights, group_start} | ||||
: thrust::tuple<double, size_type, size_type>{ | ||||
cumulative_weights[last_weight_index], num_weights, group_start}; | ||||
} | ||||
}; | ||||
|
||||
struct tdigest_min { | ||||
__device__ double operator()(thrust::tuple<double, size_type> const& t) | ||||
{ | ||||
|
@@ -233,35 +271,39 @@ __device__ double scale_func_k1(double quantile, double delta_norm) | |||
* | ||||
* @param delta_ tdigest compression level | ||||
* @param num_groups The number of input groups | ||||
* @param nearest_weight_ A functor which returns the nearest weight in the input | ||||
* @param nearest_weight A functor which returns the nearest weight in the input | ||||
* stream that falls before our current cluster limit | ||||
* @param total_weight_ A functor which returns the expected total weight for | ||||
* the entire stream of input values for the specified group. | ||||
* @param group_info A functor which returns the info for the specified group (total | ||||
* weight, size and start offset) | ||||
* @param group_cluster_wl Output. The set of cluster weight limits for each group. | ||||
* @param group_num_clusters Output. The number of output clusters for each input group. | ||||
* @param group_cluster_offsets Offsets per-group to the start of it's clusters | ||||
* @param has_nulls Whether or not the input contains nulls | ||||
* | ||||
*/ | ||||
template <typename TotalWeightIter, typename NearestWeightFunc, typename CumulativeWeight> | ||||
|
||||
template <typename GroupInfo, typename NearestWeightFunc, typename CumulativeWeight> | ||||
__global__ void generate_cluster_limits_kernel(int delta_, | ||||
size_type num_groups, | ||||
NearestWeightFunc nearest_weight, | ||||
TotalWeightIter total_weight_, | ||||
GroupInfo group_info, | ||||
CumulativeWeight cumulative_weight, | ||||
double* group_cluster_wl, | ||||
size_type* group_num_clusters, | ||||
offset_type const* group_cluster_offsets, | ||||
bool has_nulls) | ||||
{ | ||||
int const tid = threadIdx.x + blockIdx.x * blockDim.x; | ||||
int const tid = threadIdx.x + blockIdx.x * blockDim.x; | ||||
|
||||
auto const group_index = tid; | ||||
if (group_index >= num_groups) { return; } | ||||
|
||||
// we will generate at most delta clusters. | ||||
double const delta = static_cast<double>(delta_); | ||||
double const delta_norm = delta / (2.0 * M_PI); | ||||
double const total_weight = total_weight_[group_index]; | ||||
double const delta = static_cast<double>(delta_); | ||||
nvdbaranec marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
double const delta_norm = delta / (2.0 * M_PI); | ||||
double total_weight; | ||||
size_type group_size, group_start; | ||||
thrust::tie(total_weight, group_size, group_start) = group_info(group_index); | ||||
Comment on lines
+303
to
+305
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume you need a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. Same as cudf/cpp/src/groupby/sort/group_tdigest.cu Line 349 in fbf769f
|
||||
|
||||
// start at the correct place based on our cluster offset. | ||||
double* cluster_wl = | ||||
|
@@ -281,11 +323,11 @@ __global__ void generate_cluster_limits_kernel(int delta_, | |||
double cur_limit = 0.0; | ||||
double cur_weight = 0.0; | ||||
double next_limit = -1.0; | ||||
int last_inserted_index = -1; | ||||
int last_inserted_index = -1; // group-relative index into the input stream | ||||
|
||||
// compute the first cluster limit | ||||
double nearest_w; | ||||
int nearest_w_index; | ||||
int nearest_w_index; // group-relative index into the input stream | ||||
while (1) { | ||||
cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w); | ||||
if (cur_weight >= total_weight) { break; } | ||||
|
@@ -331,9 +373,18 @@ __global__ void generate_cluster_limits_kernel(int delta_, | |||
// during the reduction step to be trivial. | ||||
// | ||||
double adjusted_next_limit = next_limit; | ||||
if (nearest_w_index == last_inserted_index || last_inserted_index < 0) { | ||||
nearest_w_index = last_inserted_index + 1; | ||||
auto [r, i, adjusted] = cumulative_weight(nearest_w_index); | ||||
if ((last_inserted_index < 0) || // if we haven't inserted anything yet | ||||
(nearest_w_index == | ||||
last_inserted_index)) { // if we land in the same bucket as the previous cap | ||||
|
||||
// force the value into this bucket | ||||
nearest_w_index = | ||||
(last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1; | ||||
|
||||
// the "adjusted" weight must be high enough so that this value will fall in the bucket. | ||||
// NOTE: cumulative_weight expects an absolute index into the input value stream, not a | ||||
// group-relative index | ||||
auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start); | ||||
nvdbaranec marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
adjusted_next_limit = max(next_limit, adjusted); | ||||
(void)r; | ||||
(void)i; | ||||
|
@@ -360,21 +411,21 @@ __global__ void generate_cluster_limits_kernel(int delta_, | |||
* @param num_groups The number of input groups | ||||
* @param nearest_weight A functor which returns the nearest weight in the input | ||||
* stream that falls before our current cluster limit | ||||
* @param total_weight A functor which returns the expected total weight for | ||||
* the entire stream of input values for the specified group. | ||||
* @param group_info A functor which returns the info for the specified group (total weight, | ||||
* size and start offset) | ||||
* @param has_nulls Whether or not the input data contains nulls | ||||
* @param stream CUDA stream used for device memory operations and kernel launches. | ||||
* @param mr Device memory resource used to allocate the returned column's device memory | ||||
* | ||||
* @returns A tuple containing the set of cluster weight limits for each group, a set of | ||||
* list-style offsets indicating group sizes, and the total number of clusters | ||||
*/ | ||||
template <typename TotalWeightIter, typename NearestWeight, typename CumulativeWeight> | ||||
template <typename GroupInfo, typename NearestWeight, typename CumulativeWeight> | ||||
std::tuple<rmm::device_uvector<double>, std::unique_ptr<column>, size_type> | ||||
generate_group_cluster_info(int delta, | ||||
size_type num_groups, | ||||
NearestWeight nearest_weight, | ||||
TotalWeightIter total_weight, | ||||
GroupInfo group_info, | ||||
CumulativeWeight cumulative_weight, | ||||
bool has_nulls, | ||||
rmm::cuda_stream_view stream, | ||||
|
@@ -390,7 +441,7 @@ generate_group_cluster_info(int delta, | |||
delta, | ||||
num_groups, | ||||
nearest_weight, | ||||
total_weight, | ||||
group_info, | ||||
cumulative_weight, | ||||
nullptr, | ||||
group_num_clusters.begin(), | ||||
|
@@ -420,7 +471,7 @@ generate_group_cluster_info(int delta, | |||
delta, | ||||
num_groups, | ||||
nearest_weight, | ||||
total_weight, | ||||
group_info, | ||||
cumulative_weight, | ||||
group_cluster_wl.begin(), | ||||
group_num_clusters.begin(), | ||||
|
@@ -616,8 +667,9 @@ std::unique_ptr<column> compute_tdigests(int delta, | |||
cudf::mutable_column_view weight_col(*centroid_weights); | ||||
|
||||
// reduce the centroids into the clusters | ||||
auto output = thrust::make_zip_iterator(thrust::make_tuple( | ||||
auto output = thrust::make_zip_iterator(thrust::make_tuple( | ||||
mean_col.begin<double>(), weight_col.begin<double>(), thrust::make_discard_iterator())); | ||||
|
||||
auto const num_values = std::distance(centroids_begin, centroids_end); | ||||
thrust::reduce_by_key(rmm::exec_policy(stream), | ||||
keys, | ||||
|
@@ -640,12 +692,6 @@ std::unique_ptr<column> compute_tdigests(int delta, | |||
mr); | ||||
} | ||||
|
||||
// retrieve total weight of scalar inputs by group index | ||||
struct scalar_total_weight { | ||||
size_type const* group_valid_counts; | ||||
__device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; } | ||||
}; | ||||
|
||||
// return the min/max value of scalar inputs by group index | ||||
template <typename T> | ||||
struct get_scalar_minmax { | ||||
|
@@ -678,17 +724,15 @@ struct typed_group_tdigest { | |||
rmm::mr::device_memory_resource* mr) | ||||
{ | ||||
// first, generate cluster weight information for each input group | ||||
auto total_weight = cudf::detail::make_counting_transform_iterator( | ||||
0, scalar_total_weight{group_valid_counts.begin()}); | ||||
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = | ||||
generate_group_cluster_info(delta, | ||||
num_groups, | ||||
nearest_value_scalar_weights{}, | ||||
total_weight, | ||||
cumulative_scalar_weight{group_offsets, group_labels}, | ||||
col.null_count() > 0, | ||||
stream, | ||||
mr); | ||||
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( | ||||
delta, | ||||
num_groups, | ||||
nearest_value_scalar_weights{group_offsets.begin()}, | ||||
scalar_group_info{group_valid_counts.begin(), group_offsets.begin()}, | ||||
cumulative_scalar_weight{group_offsets, group_labels}, | ||||
col.null_count() > 0, | ||||
stream, | ||||
mr); | ||||
|
||||
// device column view. handy because the .element() function | ||||
// automatically handles fixed-point conversions for us | ||||
|
@@ -927,25 +971,15 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input, | |||
auto const delta = max_centroids; | ||||
|
||||
// generate cluster info | ||||
auto total_group_weight = cudf::detail::make_counting_transform_iterator( | ||||
0, | ||||
[outer_offsets = group_offsets.data(), | ||||
inner_offsets = tdigest_offsets.begin<size_type>(), | ||||
cumulative_weights = | ||||
cumulative_weights->view().begin<double>()] __device__(size_type group_index) -> double { | ||||
// if there's no weights in this group of digests at all, return 0. | ||||
auto const num_weights = | ||||
inner_offsets[outer_offsets[group_index + 1]] - inner_offsets[outer_offsets[group_index]]; | ||||
auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1; | ||||
return num_weights == 0 ? 0 : cumulative_weights[last_weight_index]; | ||||
}); | ||||
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( | ||||
delta, | ||||
num_groups, | ||||
nearest_value_centroid_weights{cumulative_weights->view().begin<double>(), | ||||
group_offsets.data(), | ||||
tdigest_offsets.begin<size_type>()}, | ||||
total_group_weight, | ||||
centroid_group_info{cumulative_weights->view().begin<double>(), | ||||
group_offsets.data(), | ||||
tdigest_offsets.begin<size_type>()}, | ||||
cumulative_centroid_weight{ | ||||
cumulative_weights->view().begin<double>(), | ||||
group_labels, | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: You could inline this so that you online calculate it when the calculation is false. Should be negligible though since I assume
num_weights
is almost always nonzero and you're already avoiding actually indexing intocumulative_weights
except when needed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure the compiler will optimize this correctly, and I think
group_end - 1
is a bit on the cryptic side. This code is already drilling through 2 layers of offsets which is confusing to begin with :)