Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gpuCI] Forward-merge branch-22.04 to branch-22.06 [skip gpuci] #10510

Merged
merged 1 commit into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 44 additions & 40 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -419,47 +419,51 @@ __global__ void generate_cluster_limits_kernel(int delta,
// NOTE: can't use structured bindings here.
thrust::tie(nearest_w, nearest_w_index) = nearest_weight(next_limit, group_index);

if (cluster_wl) {
// because of the way the scale functions work, it is possible to generate clusters
// in such a way that we end up with "gaps" where there are no input values that
// fall into a given cluster. An example would be this:
//
// cluster weight limits = 0.00003, 1.008, 3.008
//
// input values(weight) = A(1), B(2), C(3)
//
// naively inserting these values into the clusters simply by taking a lower_bound,
// we would get the following distribution of input values into those 3 clusters.
// (), (A), (B,C)
//
// whereas what we really want is:
//
// (A), (B), (C)
//
// to fix this, we will artificially adjust the output cluster limits to guarantee
// at least 1 input value will be put in each cluster during the reduction step.
// this does not affect final centroid results as we still use the "real" weight limits
// to compute subsequent clusters - the purpose is only to allow cluster selection
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
nearest_w_index =
(last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1;

// the "adjusted" weight must be high enough so that this value will fall in the bucket.
// NOTE: cumulative_weight expects an absolute index into the input value stream, not a
// group-relative index
[[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted);
}
cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit;
last_inserted_index = nearest_w_index;
// because of the way the scale functions work, it is possible to generate clusters
// in such a way that we end up with "gaps" where there are no input values that
// fall into a given cluster. An example would be this:
//
// cluster weight limits = 0.00003, 1.008, 3.008
//
// input values(weight) = A(1), B(2), C(3)
//
// naively inserting these values into the clusters simply by taking a lower_bound,
// we would get the following distribution of input values into those 3 clusters.
// (), (A), (B,C)
//
// whereas what we really want is:
//
// (A), (B), (C)
//
// to fix this, we will artificially adjust the output cluster limits to guarantee
// at least 1 input value will be put in each cluster during the reduction step.
// this does not affect final centroid results as we still use the "real" weight limits
// to compute subsequent clusters - the purpose is only to allow cluster selection
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
int adjusted_w_index = nearest_w_index;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
adjusted_w_index = (last_inserted_index == group_size - 1)
? last_inserted_index
: max(adjusted_w_index, last_inserted_index + 1);

// the "adjusted" cluster limit must be high enough so that this value will fall in the
// bucket. NOTE: cumulative_weight expects an absolute index into the input value stream, not
// a group-relative index
[[maybe_unused]] auto [r, i, adjusted_w] = cumulative_weight(adjusted_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted_w);

// update the weight with our adjusted value.
nearest_w = adjusted_w;
}
if (cluster_wl) { cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit; }
last_inserted_index = adjusted_w_index;

group_num_clusters[group_index]++;
cur_limit = next_limit;
}
Expand Down
76 changes: 76 additions & 0 deletions cpp/tests/reductions/tdigest_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,81 @@ struct ReductionTDigestMerge : public cudf::test::BaseFixture {

TEST_F(ReductionTDigestMerge, Simple) { tdigest_merge_simple(reduce_op{}, reduce_merge_op{}); }

// tests an issue with the cluster generating code with a small number of centroids that have large
// weights
TEST_F(ReductionTDigestMerge, FewHeavyCentroids)
{
// digest 1
cudf::test::fixed_width_column_wrapper<double> c0c{1.0, 2.0};
cudf::test::fixed_width_column_wrapper<double> c0w{100.0, 50.0};
cudf::test::structs_column_wrapper c0s({c0c, c0w});
cudf::test::fixed_width_column_wrapper<offset_type> c0_offsets{0, 2};
auto c0l = cudf::make_lists_column(
1, c0_offsets.release(), c0s.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> c0min{1.0};
cudf::test::fixed_width_column_wrapper<double> c0max{2.0};
std::vector<std::unique_ptr<column>> c0_children;
c0_children.push_back(std::move(c0l));
c0_children.push_back(c0min.release());
c0_children.push_back(c0max.release());
// tdigest struct
auto c0 = cudf::make_structs_column(1, std::move(c0_children), 0, {});
cudf::tdigest::tdigest_column_view tdv0(*c0);

// digest 2
cudf::test::fixed_width_column_wrapper<double> c1c{3.0, 4.0};
cudf::test::fixed_width_column_wrapper<double> c1w{200.0, 50.0};
cudf::test::structs_column_wrapper c1s({c1c, c1w});
cudf::test::fixed_width_column_wrapper<offset_type> c1_offsets{0, 2};
auto c1l = cudf::make_lists_column(
1, c1_offsets.release(), c1s.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> c1min{3.0};
cudf::test::fixed_width_column_wrapper<double> c1max{4.0};
std::vector<std::unique_ptr<column>> c1_children;
c1_children.push_back(std::move(c1l));
c1_children.push_back(c1min.release());
c1_children.push_back(c1max.release());
// tdigest struct
auto c1 = cudf::make_structs_column(1, std::move(c1_children), 0, {});

std::vector<column_view> views;
views.push_back(*c0);
views.push_back(*c1);
auto values = cudf::concatenate(views);

// merge
auto scalar_result =
cudf::reduce(*values,
cudf::make_merge_tdigest_aggregation<cudf::reduce_aggregation>(1000),
cudf::data_type{cudf::type_id::STRUCT});

// convert to a table
auto tbl = static_cast<cudf::struct_scalar const*>(scalar_result.get())->view();
std::vector<std::unique_ptr<cudf::column>> cols;
std::transform(
tbl.begin(), tbl.end(), std::back_inserter(cols), [](cudf::column_view const& col) {
return std::make_unique<cudf::column>(col);
});
auto result = cudf::make_structs_column(tbl.num_rows(), std::move(cols), 0, rmm::device_buffer());

// we expect to see exactly 4 centroids (the same inputs) with properly computed min/max.
cudf::test::fixed_width_column_wrapper<double> ec{1.0, 2.0, 3.0, 4.0};
cudf::test::fixed_width_column_wrapper<double> ew{100.0, 50.0, 200.0, 50.0};
cudf::test::structs_column_wrapper es({ec, ew});
cudf::test::fixed_width_column_wrapper<offset_type> e_offsets{0, 4};
auto el = cudf::make_lists_column(
1, e_offsets.release(), es.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> emin{1.0};
cudf::test::fixed_width_column_wrapper<double> emax{4.0};
std::vector<std::unique_ptr<column>> e_children;
e_children.push_back(std::move(el));
e_children.push_back(emin.release());
e_children.push_back(emax.release());
// tdigest struct
auto expected = cudf::make_structs_column(1, std::move(e_children), 0, {});

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}

} // namespace test
} // namespace cudf