Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix empty cluster handling in tdigest merge #16675

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cpp/include/cudf/detail/tdigest/tdigest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,17 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
/**
* @brief Create an empty tdigest column.
*
* An empty tdigest column contains a single row of length 0
* An empty tdigest column contains specified number of rows of length 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* An empty tdigest column contains specified number of rows of length 0.
* An empty tdigest column contains the specified number of rows of length 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I fixed this doc to further clarify it.

*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns An empty tdigest column.
*/
CUDF_EXPORT
std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create an empty tdigest scalar.
Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cudf_test/tdigest_utilities.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op)
static_cast<column_view>(values).type(), tdigest_gen{}, op, values, delta);

// NOTE: an empty tdigest column still has 1 row.
auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), rmm::mr::get_current_device_resource());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}
Expand Down Expand Up @@ -562,12 +562,12 @@ template <typename MergeFunc>
void tdigest_merge_empty(MergeFunc merge_op)
{
// 3 empty tdigests all in the same group
auto a = cudf::tdigest::detail::make_empty_tdigest_column(cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto b = cudf::tdigest::detail::make_empty_tdigest_column(cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto c = cudf::tdigest::detail::make_empty_tdigest_column(cudf::get_default_stream(),
rmm::mr::get_current_device_resource());
auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), rmm::mr::get_current_device_resource());
std::vector<column_view> cols;
cols.push_back(*a);
cols.push_back(*b);
Expand All @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op)
auto const delta = 1000;
auto result = merge_op(*values, delta);

auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), rmm::mr::get_current_device_resource());
auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), rmm::mr::get_current_device_resource());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result);
}
Expand Down
23 changes: 12 additions & 11 deletions cpp/src/quantiles/tdigest/tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -292,32 +292,33 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr);
}

std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto offsets = cudf::make_fixed_width_column(
data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr);
data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
offsets->mutable_view().begin<size_type>(),
offsets->mutable_view().end<size_type>(),
0);

auto min_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
auto min_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
min_col->mutable_view().begin<double>(),
min_col->mutable_view().end<double>(),
0);
auto max_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
auto max_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
max_col->mutable_view().begin<double>(),
max_col->mutable_view().end<double>(),
0);

return make_tdigest_column(1,
make_empty_column(type_id::FLOAT64),
make_empty_column(type_id::FLOAT64),
return make_tdigest_column(num_rows,
cudf::make_empty_column(type_id::FLOAT64),
cudf::make_empty_column(type_id::FLOAT64),
std::move(offsets),
std::move(min_col),
std::move(max_col),
Expand All @@ -338,7 +339,7 @@ std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto contents = make_empty_tdigest_column(stream, mr)->release();
auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release();
return std::make_unique<struct_scalar>(
std::move(*std::make_unique<table>(std::move(contents.children))), true, stream, mr);
}
Expand Down
70 changes: 45 additions & 25 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ std::unique_ptr<scalar> to_tdigest_scalar(std::unique_ptr<column>&& tdigest,
* @param group_cluster_wl Output. The set of cluster weight limits for each group.
* @param group_num_clusters Output. The number of output clusters for each input group.
* @param group_cluster_offsets Offsets per-group to the start of it's clusters
* @param has_nulls Whether or not the input contains nulls
*
* @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be
* set to false when there is no empty cluster, true otherwise.
*/

template <typename GroupInfo, typename NearestWeightFunc, typename CumulativeWeight>
Expand All @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
double* group_cluster_wl,
size_type* group_num_clusters,
size_type const* group_cluster_offsets,
bool has_nulls)
bool may_have_empty_clusters)
{
int const tid = threadIdx.x + blockIdx.x * blockDim.x;

Expand All @@ -399,11 +399,12 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
// a group with nothing in it.
group_num_clusters[group_index] = 0;
if (total_weight <= 0) {
// if the input contains nulls we can potentially have a group that generates no
// clusters because -all- of the input values are null. in that case, the reduce_by_key call
// in the tdigest generation step will need a location to store the unused reduction value for
// that group of nulls. these "stubs" will be postprocessed out afterwards.
if (has_nulls) { group_num_clusters[group_index] = 1; }
// If the input contains empty clusters, we can potentially have a group that also generates
// empty clusters because -all- of the input values are null or empty cluster. In that case, the
// `reduce_by_key` call in the tdigest generation step will need a location to store the unused
// reduction value for that group of nulls and empty clusters. These "stubs" will be
// postprocessed out afterwards.
if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; }
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
return;
}

Expand Down Expand Up @@ -502,7 +503,8 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
* stream that falls before our current cluster limit
* @param group_info A functor which returns the info for the specified group (total weight,
* size and start offset)
* @param has_nulls Whether or not the input data contains nulls
* @param may_have_empty_clusters Whether or not there could be empty clusters. It should be
* set to false only when there is no empty cluster.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
Expand All @@ -516,7 +518,7 @@ generate_group_cluster_info(int delta,
NearestWeight nearest_weight,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
bool has_nulls,
bool may_have_empty_clusters,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -535,7 +537,7 @@ generate_group_cluster_info(int delta,
nullptr,
group_num_clusters.begin(),
nullptr,
has_nulls);
may_have_empty_clusters);

// generate group cluster offsets (where the clusters for a given group start and end)
auto group_cluster_offsets = cudf::make_numeric_column(
Expand Down Expand Up @@ -567,7 +569,7 @@ generate_group_cluster_info(int delta,
group_cluster_wl.begin(),
group_num_clusters.begin(),
group_cluster_offsets->view().begin<size_type>(),
has_nulls);
may_have_empty_clusters);

return {std::move(group_cluster_wl),
std::move(group_cluster_offsets),
Expand All @@ -580,7 +582,7 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
std::unique_ptr<column>&& offsets,
std::unique_ptr<column>&& min_col,
std::unique_ptr<column>&& max_col,
bool has_nulls,
bool may_have_empty_clusters,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -595,7 +597,7 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; };

size_type const num_stubs = [&]() {
if (!has_nulls) { return 0; }
if (!may_have_empty_clusters) { return 0; }
auto iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<size_type>(is_stub_digest));
return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows);
Expand Down Expand Up @@ -661,6 +663,10 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
mr);
}

/**
* @brief A functor which returns the cluster index within a group that the value at
* the given value index falls into.
*/
template <typename CumulativeWeight>
struct compute_tdigests_keys_fn {
int const delta;
Expand Down Expand Up @@ -706,16 +712,17 @@ struct compute_tdigests_keys_fn {
* boundaries.
*
* @param delta tdigest compression level
* @param values_begin Beginning of the range of input values.
* @param values_end End of the range of input values.
* @param centroids_begin Beginning of the range of centroids.
* @param centroids_end End of the range of centroids.
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
* @param cumulative_weight Functor which returns cumulative weight and group information for
* an absolute input value index.
* @param min_col Column containing the minimum value per group.
* @param max_col Column containing the maximum value per group.
* @param group_cluster_wl Cluster weight limits for each group.
* @param group_cluster_offsets R-value reference of offsets into the cluster weight limits.
* @param total_clusters Total number of clusters in all groups.
* @param has_nulls Whether or not the input contains nulls
* @param may_have_empty_clusters Whether or not there could be empty clusters. It should be
* set to false only when there is no empty cluster.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
Expand All @@ -731,7 +738,7 @@ std::unique_ptr<column> compute_tdigests(int delta,
rmm::device_uvector<double> const& group_cluster_wl,
std::unique_ptr<column>&& group_cluster_offsets,
size_type total_clusters,
bool has_nulls,
bool may_have_empty_clusters,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -750,7 +757,9 @@ std::unique_ptr<column> compute_tdigests(int delta,
// double // max
// }
//
if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); }
if (total_clusters == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

// each input group represents an individual tdigest. within each tdigest, we want the keys
// to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall
Expand Down Expand Up @@ -793,7 +802,7 @@ std::unique_ptr<column> compute_tdigests(int delta,
std::move(group_cluster_offsets),
std::move(min_col),
std::move(max_col),
has_nulls,
may_have_empty_clusters,
stream,
mr);
}
Expand Down Expand Up @@ -1145,8 +1154,13 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
auto merged =
cudf::detail::concatenate(tdigest_views, stream, rmm::mr::get_current_device_resource());

auto merged_weights = merged->get_column(1).view();
// If there are no values, we can simply return a column that has only empty tdigests.
if (merged_weights.size() == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr);
}
Comment on lines +1157 to +1161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be just auto const merged_weights_size = merged->get_column(1).size();. No need to get a view.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not even needed to create a temp variable and it can directly be used in if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The merged_weights variable is an existing variable and used in other places as well. I just have moved it from its original line. That being said, I can remove it if it's a problem. I thought creating a column view is cheap. Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A column_view is not a plain type. It has several internal structures such as vector, pointer, counter etc. Creating an instance will require copying/initializing all these fields. That can be "cheap" but still incur some overhead thus we should avoid doing so if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ttnghia. Your explanation aligns with my understanding. In that case, I think this is OK because the shortcut in this if clause will be used only when every key has the empty cluster. This is quite a rare case and won't happen often in production.


// generate cumulative weights
auto merged_weights = merged->get_column(1).view();
auto cumulative_weights = cudf::make_numeric_column(
data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream);
auto keys = cudf::detail::make_counting_transform_iterator(
Expand All @@ -1161,6 +1175,10 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,

auto const delta = max_centroids;

// We do not know whether there is any empty cluster in the input without actually reading the
// data, which could be expensive. So, we just assume that there could be empty clusters.
auto const may_have_empty_clusters = true;
Comment on lines +1178 to +1180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we would never change it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, this flag was fixed to false, which implies that empty clusters should never be found during the merge. This is not always true, which is the root cause of the bug. It is rather always true to assume that there might be some clusters until we inspect all clusters, which is expensive. Having this flag fixed to true might be always not optimal when there is indeed no empty cluster. We can make this better later for this case if that turns out to be a problem.


// generate cluster info
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
Expand All @@ -1177,7 +1195,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_labels,
group_offsets,
{tdigest_offsets.begin<size_type>(), static_cast<size_t>(tdigest_offsets.size())}},
false,
may_have_empty_clusters,
stream,
mr);

Expand All @@ -1202,7 +1220,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_cluster_wl,
std::move(group_cluster_offsets),
total_clusters,
false,
may_have_empty_clusters,
stream,
mr);
}
Expand Down Expand Up @@ -1267,7 +1285,9 @@ std::unique_ptr<column> group_tdigest(column_view const& col,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); }
if (col.size() == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

auto const delta = max_centroids;
return cudf::type_dispatcher(col.type(),
Expand All @@ -1293,7 +1313,7 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
tdigest_column_view tdv(input);

if (num_groups == 0 || input.size() == 0) {
return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr);
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

// bring group offsets back to the host
Expand Down
Loading
Loading