Skip to content

Commit

Permalink
Fix an issue with tdigest merge aggregations. (#10506)
Browse files Browse the repository at this point in the history
A case was found where merging tdigests that contain very small numbers of centroids with high weight could cause an invalid resulting tdigest to be generated.  The issue was in the "gap fixup" code during the cluster generation step.

The diff here is unfortunate but the change is fundamentally:  Make sure we run through the gap-fixing code during the first pass (where we're just counting the # of buckets), and update the nearest weight (nearest_w) variable representing the centroid we just bucketed.

<s>Leaving a Do Not Merge tag on here for now to get confirmation of fix from Spark team.</s>

Authors:
  - https://github.com/nvdbaranec

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - MithunR (https://github.com/mythrocks)

URL: #10506
  • Loading branch information
nvdbaranec authored Mar 24, 2022
1 parent 54918d8 commit a4c450b
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 40 deletions.
84 changes: 44 additions & 40 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -419,47 +419,51 @@ __global__ void generate_cluster_limits_kernel(int delta,
// NOTE: can't use structured bindings here.
thrust::tie(nearest_w, nearest_w_index) = nearest_weight(next_limit, group_index);

if (cluster_wl) {
// because of the way the scale functions work, it is possible to generate clusters
// in such a way that we end up with "gaps" where there are no input values that
// fall into a given cluster. An example would be this:
//
// cluster weight limits = 0.00003, 1.008, 3.008
//
// input values(weight) = A(1), B(2), C(3)
//
// naively inserting these values into the clusters simply by taking a lower_bound,
// we would get the following distribution of input values into those 3 clusters.
// (), (A), (B,C)
//
// whereas what we really want is:
//
// (A), (B), (C)
//
// to fix this, we will artificially adjust the output cluster limits to guarantee
// at least 1 input value will be put in each cluster during the reduction step.
// this does not affect final centroid results as we still use the "real" weight limits
// to compute subsequent clusters - the purpose is only to allow cluster selection
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
nearest_w_index =
(last_inserted_index == group_size - 1) ? last_inserted_index : last_inserted_index + 1;

// the "adjusted" weight must be high enough so that this value will fall in the bucket.
// NOTE: cumulative_weight expects an absolute index into the input value stream, not a
// group-relative index
[[maybe_unused]] auto [r, i, adjusted] = cumulative_weight(nearest_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted);
}
cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit;
last_inserted_index = nearest_w_index;
// because of the way the scale functions work, it is possible to generate clusters
// in such a way that we end up with "gaps" where there are no input values that
// fall into a given cluster. An example would be this:
//
// cluster weight limits = 0.00003, 1.008, 3.008
//
// input values(weight) = A(1), B(2), C(3)
//
// naively inserting these values into the clusters simply by taking a lower_bound,
// we would get the following distribution of input values into those 3 clusters.
// (), (A), (B,C)
//
// whereas what we really want is:
//
// (A), (B), (C)
//
// to fix this, we will artificially adjust the output cluster limits to guarantee
// at least 1 input value will be put in each cluster during the reduction step.
// this does not affect final centroid results as we still use the "real" weight limits
// to compute subsequent clusters - the purpose is only to allow cluster selection
// during the reduction step to be trivial.
//
double adjusted_next_limit = next_limit;
int adjusted_w_index = nearest_w_index;
if ((last_inserted_index < 0) || // if we haven't inserted anything yet
(nearest_w_index ==
last_inserted_index)) { // if we land in the same bucket as the previous cap

// force the value into this bucket
adjusted_w_index = (last_inserted_index == group_size - 1)
? last_inserted_index
: max(adjusted_w_index, last_inserted_index + 1);

// the "adjusted" cluster limit must be high enough so that this value will fall in the
// bucket. NOTE: cumulative_weight expects an absolute index into the input value stream, not
// a group-relative index
[[maybe_unused]] auto [r, i, adjusted_w] = cumulative_weight(adjusted_w_index + group_start);
adjusted_next_limit = max(next_limit, adjusted_w);

// update the weight with our adjusted value.
nearest_w = adjusted_w;
}
if (cluster_wl) { cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit; }
last_inserted_index = adjusted_w_index;

group_num_clusters[group_index]++;
cur_limit = next_limit;
}
Expand Down
76 changes: 76 additions & 0 deletions cpp/tests/reductions/tdigest_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,81 @@ struct ReductionTDigestMerge : public cudf::test::BaseFixture {

TEST_F(ReductionTDigestMerge, Simple) { tdigest_merge_simple(reduce_op{}, reduce_merge_op{}); }

// tests an issue with the cluster generating code with a small number of centroids that have large
// weights
TEST_F(ReductionTDigestMerge, FewHeavyCentroids)
{
// digest 1
cudf::test::fixed_width_column_wrapper<double> c0c{1.0, 2.0};
cudf::test::fixed_width_column_wrapper<double> c0w{100.0, 50.0};
cudf::test::structs_column_wrapper c0s({c0c, c0w});
cudf::test::fixed_width_column_wrapper<offset_type> c0_offsets{0, 2};
auto c0l = cudf::make_lists_column(
1, c0_offsets.release(), c0s.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> c0min{1.0};
cudf::test::fixed_width_column_wrapper<double> c0max{2.0};
std::vector<std::unique_ptr<column>> c0_children;
c0_children.push_back(std::move(c0l));
c0_children.push_back(c0min.release());
c0_children.push_back(c0max.release());
// tdigest struct
auto c0 = cudf::make_structs_column(1, std::move(c0_children), 0, {});
cudf::tdigest::tdigest_column_view tdv0(*c0);

// digest 2
cudf::test::fixed_width_column_wrapper<double> c1c{3.0, 4.0};
cudf::test::fixed_width_column_wrapper<double> c1w{200.0, 50.0};
cudf::test::structs_column_wrapper c1s({c1c, c1w});
cudf::test::fixed_width_column_wrapper<offset_type> c1_offsets{0, 2};
auto c1l = cudf::make_lists_column(
1, c1_offsets.release(), c1s.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> c1min{3.0};
cudf::test::fixed_width_column_wrapper<double> c1max{4.0};
std::vector<std::unique_ptr<column>> c1_children;
c1_children.push_back(std::move(c1l));
c1_children.push_back(c1min.release());
c1_children.push_back(c1max.release());
// tdigest struct
auto c1 = cudf::make_structs_column(1, std::move(c1_children), 0, {});

std::vector<column_view> views;
views.push_back(*c0);
views.push_back(*c1);
auto values = cudf::concatenate(views);

// merge
auto scalar_result =
cudf::reduce(*values,
cudf::make_merge_tdigest_aggregation<cudf::reduce_aggregation>(1000),
cudf::data_type{cudf::type_id::STRUCT});

// convert to a table
auto tbl = static_cast<cudf::struct_scalar const*>(scalar_result.get())->view();
std::vector<std::unique_ptr<cudf::column>> cols;
std::transform(
tbl.begin(), tbl.end(), std::back_inserter(cols), [](cudf::column_view const& col) {
return std::make_unique<cudf::column>(col);
});
auto result = cudf::make_structs_column(tbl.num_rows(), std::move(cols), 0, rmm::device_buffer());

// we expect to see exactly 4 centroids (the same inputs) with properly computed min/max.
cudf::test::fixed_width_column_wrapper<double> ec{1.0, 2.0, 3.0, 4.0};
cudf::test::fixed_width_column_wrapper<double> ew{100.0, 50.0, 200.0, 50.0};
cudf::test::structs_column_wrapper es({ec, ew});
cudf::test::fixed_width_column_wrapper<offset_type> e_offsets{0, 4};
auto el = cudf::make_lists_column(
1, e_offsets.release(), es.release(), cudf::UNKNOWN_NULL_COUNT, rmm::device_buffer{});
cudf::test::fixed_width_column_wrapper<double> emin{1.0};
cudf::test::fixed_width_column_wrapper<double> emax{4.0};
std::vector<std::unique_ptr<column>> e_children;
e_children.push_back(std::move(el));
e_children.push_back(emin.release());
e_children.push_back(emax.release());
// tdigest struct
auto expected = cudf::make_structs_column(1, std::move(e_children), 0, {});

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}

} // namespace test
} // namespace cudf

0 comments on commit a4c450b

Please sign in to comment.