Skip to content

Commit

Permalink
Merge pull request #10510 from rapidsai/branch-22.04
Browse files Browse the repository at this point in the history
[gpuCI] Forward-merge branch-22.04 to branch-22.06 [skip gpuci]
  • Loading branch information
GPUtester authored Mar 24, 2022
2 parents 8c7260f + a4c450b commit 8d86ae8
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 40 deletions.
84 changes: 44 additions & 40 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -419,47 +419,51 @@ __global__ void generate_cluster_limits_kernel(int delta,
// NOTE: can't use structured bindings here.
thrust::tie(nearest_w, nearest_w_index) = nearest_weight(next_limit, group_index);

if (cluster_wl) {
// because of the way the scale functions work, it is possible to generate clusters
// in such a way that we end up with "gaps" where there are no input values that
// fall into a given cluster. An example would be this:
//
// cluster weight limits = 0.00003, 1.008, 3.008
//
// input values(weight) = A(1), B(2), C(3)
//
// naively inserting these values into the clusters simply by taking a lower_bound,
// we would get the following distribution of input values into those 3 clusters.
// (), (A), (B,C)
//
// whereas what we really want is:
//
// (A), (B), (C)
//
// to fix this, we will artificially adjust the output cluster limits to guarantee
// at least 1 input value will be put in each cluster during the reduction step.
// this does not affect final centroid results as we still use the "real" weight limits
// to compute subsequent clusters - the purpose is only to allow cluster selection
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
nearest_w_index =
(last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1;

// the "adjusted" weight must be high enough so that this value will fall in the bucket.
// NOTE: cumulative_weight expects an absolute index into the input value stream, not a
// group-relative index
[[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted);
}
cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit;
last_inserted_index = nearest_w_index;
// because of the way the scale functions work, it is possible to generate clusters
// in such a way that we end up with "gaps" where there are no input values that
// fall into a given cluster. An example would be this:
//
// cluster weight limits = 0.00003, 1.008, 3.008
//
// input values(weight) = A(1), B(2), C(3)
//
// naively inserting these values into the clusters simply by taking a lower_bound,
// we would get the following distribution of input values into those 3 clusters.
// (), (A), (B,C)
//
// whereas what we really want is:
//
// (A), (B), (C)
//
// to fix this, we will artificially adjust the output cluster limits to guarantee
// at least 1 input value will be put in each cluster during the reduction step.
// this does not affect final centroid results as we still use the "real" weight limits
// to compute subsequent clusters - the purpose is only to allow cluster selection
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
int adjusted_w_index = nearest_w_index;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
adjusted_w_index = (last_inserted_index == group_size - 1)
? last_inserted_index
: max(adjusted_w_index, last_inserted_index + 1);

// the "adjusted" cluster limit must be high enough so that this value will fall in the
// bucket. NOTE: cumulative_weight expects an absolute index into the input value stream, not
// a group-relative index
[[maybe_unused]] auto [r, i, adjusted_w] = cumulative_weight(adjusted_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted_w);

// update the weight with our adjusted value.
nearest_w = adjusted_w;
}
if (cluster_wl) { cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit; }
last_inserted_index = adjusted_w_index;

group_num_clusters[group_index]++;
cur_limit = next_limit;
}
Expand Down
76 changes: 76 additions & 0 deletions cpp/tests/reductions/tdigest_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,81 @@ struct ReductionTDigestMerge : public cudf::test::BaseFixture {

TEST_F(ReductionTDigestMerge, Simple) { tdigest_merge_simple(reduce_op{}, reduce_merge_op{}); }

// tests an issue with the cluster generating code with a small number of centroids that have large
// weights
TEST_F(ReductionTDigestMerge, FewHeavyCentroids)
{
// digest 1
cudf::test::fixed_width_column_wrapper<double> c0c{1.0, 2.0};
cudf::test::fixed_width_column_wrapper<double> c0w{100.0, 50.0};
cudf::test::structs_column_wrapper c0s({c0c, c0w});
cudf::test::fixed_width_column_wrapper<offset_type> c0_offsets{0, 2};
auto c0l = cudf::make_lists_column(
1, c0_offsets.release(), c0s.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> c0min{1.0};
cudf::test::fixed_width_column_wrapper<double> c0max{2.0};
std::vector<std::unique_ptr<column>> c0_children;
c0_children.push_back(std::move(c0l));
c0_children.push_back(c0min.release());
c0_children.push_back(c0max.release());
// tdigest struct
auto c0 = cudf::make_structs_column(1, std::move(c0_children), 0, {});
cudf::tdigest::tdigest_column_view tdv0(*c0);

// digest 2
cudf::test::fixed_width_column_wrapper<double> c1c{3.0, 4.0};
cudf::test::fixed_width_column_wrapper<double> c1w{200.0, 50.0};
cudf::test::structs_column_wrapper c1s({c1c, c1w});
cudf::test::fixed_width_column_wrapper<offset_type> c1_offsets{0, 2};
auto c1l = cudf::make_lists_column(
1, c1_offsets.release(), c1s.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> c1min{3.0};
cudf::test::fixed_width_column_wrapper<double> c1max{4.0};
std::vector<std::unique_ptr<column>> c1_children;
c1_children.push_back(std::move(c1l));
c1_children.push_back(c1min.release());
c1_children.push_back(c1max.release());
// tdigest struct
auto c1 = cudf::make_structs_column(1, std::move(c1_children), 0, {});

std::vector<column_view> views;
views.push_back(*c0);
views.push_back(*c1);
auto values = cudf::concatenate(views);

// merge
auto scalar_result =
cudf::reduce(*values,
cudf::make_merge_tdigest_aggregation<cudf::reduce_aggregation>(1000),
cudf::data_type{cudf::type_id::STRUCT});

// convert to a table
auto tbl = static_cast<cudf::struct_scalar const*>(scalar_result.get())->view();
std::vector<std::unique_ptr<cudf::column>> cols;
std::transform(
tbl.begin(), tbl.end(), std::back_inserter(cols), [](cudf::column_view const& col) {
return std::make_unique<cudf::column>(col);
});
auto result = cudf::make_structs_column(tbl.num_rows(), std::move(cols), 0, rmm::device_buffer());

// we expect to see exactly 4 centroids (the same inputs) with properly computed min/max.
cudf::test::fixed_width_column_wrapper<double> ec{1.0, 2.0, 3.0, 4.0};
cudf::test::fixed_width_column_wrapper<double> ew{100.0, 50.0, 200.0, 50.0};
cudf::test::structs_column_wrapper es({ec, ew});
cudf::test::fixed_width_column_wrapper<offset_type> e_offsets{0, 4};
auto el = cudf::make_lists_column(
1, e_offsets.release(), es.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> emin{1.0};
cudf::test::fixed_width_column_wrapper<double> emax{4.0};
std::vector<std::unique_ptr<column>> e_children;
e_children.push_back(std::move(el));
e_children.push_back(emin.release());
e_children.push_back(emax.release());
// tdigest struct
auto expected = cudf::make_structs_column(1, std::move(e_children), 0, {});

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}

} // namespace test
} // namespace cudf

0 comments on commit 8d86ae8

Please sign in to comment.