Skip to content

Commit

Permalink
Merge tdigest aggregation for cudf::reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
nvdbaranec committed Mar 14, 2022
1 parent 83f4d31 commit 6a2d50e
Show file tree
Hide file tree
Showing 12 changed files with 666 additions and 484 deletions.
6 changes: 3 additions & 3 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,11 @@ add_library(cudf::cudf ALIAS cudf)

add_library(
cudftestutil STATIC
tests/io/metadata_utilities.cpp
tests/io/metadata_utilities.cpp
tests/quantiles/tdigest_utilities.cu
tests/utilities/base_fixture.cpp
tests/utilities/base_fixture.cpp
tests/utilities/column_utilities.cu
tests/utilities/table_utilities.cu
tests/utilities/table_utilities.cu
tests/strings/utilities.cpp
)

Expand Down
130 changes: 126 additions & 4 deletions cpp/include/cudf/detail/tdigest/tdigest.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,95 @@ namespace detail {
namespace tdigest {

/**
* @brief Create a tdigest column from it's constituent components.
* @brief Generate a tdigest column from a grouped set of numeric input values.
*
* The tdigest column produced is of the following structure:
*
* struct {
* // centroids for the digest
* list {
* struct {
* double // mean
* double // weight
* },
* ...
* }
* // these are from the input stream, not the centroids. they are used
* // during the percentile_approx computation near the beginning or
* // end of the quantiles
* double // min
* double // max
* }
*
* Each output row is a single tdigest. The length of the row is the "size" of the
* tdigest, each element of which represents a weighted centroid (mean, weight).
*
* @param values Grouped (and sorted) values to merge.
* @param group_offsets Offsets of groups' starting points within @p values.
* @param group_labels 0-based ID of group that the corresponding value belongs to
* @param group_valid_counts Per-group counts of valid elements.
* @param num_groups Number of groups.
* @param max_centroids Parameter controlling the level of compression of the tdigest. Higher
* values result in a larger, more precise tdigest.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
* @returns tdigest column, with 1 tdigest per row
*/
std::unique_ptr<column> group_tdigest(column_view const& values,
cudf::device_span<size_type const> group_offsets,
cudf::device_span<size_type const> group_labels,
cudf::device_span<size_type const> group_valid_counts,
size_type num_groups,
int max_centroids,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Merges tdigests within the same group to generate a new tdigest.
*
* The tdigest column produced is of the following structure:
*
* struct {
* // centroids for the digest
* list {
* struct {
* double // mean
* double // weight
* },
* ...
* }
* // these are from the input stream, not the centroids. they are used
* // during the percentile_approx computation near the beginning or
* // end of the quantiles
* double // min
* double // max
* }
*
* Each output row is a single tdigest. The length of the row is the "size" of the
* tdigest, each element of which represents a weighted centroid (mean, weight).
*
* @param values Grouped tdigests to merge.
* @param group_offsets Offsets of groups' starting points within @p values.
* @param group_labels 0-based ID of group that the corresponding value belongs to
* @param num_groups Number of groups.
* @param max_centroids Parameter controlling the level of compression of the tdigest. Higher
* values result in a larger, more precise tdigest.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
* @returns tdigest column, with 1 tdigest per row
*/
std::unique_ptr<column> group_merge_tdigest(column_view const& values,
cudf::device_span<size_type const> group_offsets,
cudf::device_span<size_type const> group_labels,
size_type num_groups,
int max_centroids,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Create a tdigest column from its constituent components.
*
* @param num_rows The number of rows in the output column.
* @param centroid_means The inner means column. These values are partitioned into lists by the
Expand Down Expand Up @@ -172,7 +260,7 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& values,
rmm::mr::device_memory_resource* mr);

/**
* @brief Generate a tdigest column from a set of numeric input values.
* @brief Generate a tdigest scalar from a set of numeric input values.
*
* The tdigest scalar produced is of the following structure:
** struct {
Expand All @@ -196,7 +284,7 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& values,
* @param max_centroids Parameter controlling the level of compression of the tdigest. Higher
* values result in a larger, more precise tdigest.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
* @param mr Device memory resource used to allocate the returned scalar's device memory
*
* @returns tdigest scalar
*/
Expand All @@ -205,6 +293,40 @@ std::unique_ptr<scalar> reduce_tdigest(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Merges multiple tdigest columns to generate a new tdigest scalar.
*
* The tdigest scalar produced is of the following structure:
*
* struct {
* // centroids for the digest
* list {
* struct {
* double // mean
* double // weight
* },
* ...
* }
* // these are from the input stream, not the centroids. they are used
* // during the percentile_approx computation near the beginning or
* // end of the quantiles
* double // min
* double // max
* }
*
* @param values tdigests to merge.
* @param max_centroids Parameter controlling the level of compression of the tdigest. Higher
* values result in a larger, more precise tdigest.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned scalar's device memory
*
* @returns tdigest column, with 1 tdigest per row
*/
std::unique_ptr<scalar> reduce_merge_tdigest(column_view const& input,
int max_centroids,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

} // namespace tdigest
} // namespace detail
} // namespace cudf
133 changes: 101 additions & 32 deletions cpp/include/cudf_test/tdigest_utilities.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,9 @@ struct column_max {
}
};

struct tdigest_gen_grouped {
template <
typename T,
typename std::enable_if_t<cudf::is_numeric<T>() || cudf::is_fixed_point<T>()>* = nullptr>
std::unique_ptr<column> operator()(column_view const& keys, column_view const& values, int delta)
{
cudf::table_view t({keys});
cudf::groupby::groupby gb(t);
std::vector<cudf::groupby::aggregation_request> requests;
std::vector<std::unique_ptr<cudf::groupby_aggregation>> aggregations;
aggregations.push_back(cudf::make_tdigest_aggregation<cudf::groupby_aggregation>(delta));
requests.push_back({values, std::move(aggregations)});
auto result = gb.aggregate(requests);
return std::move(result.second[0].results[0]);
}

template <
typename T,
typename std::enable_if_t<!cudf::is_numeric<T>() && !cudf::is_fixed_point<T>()>* = nullptr>
std::unique_ptr<column> operator()(column_view const& keys, column_view const& values, int delta)
{
CUDF_FAIL("Invalid tdigest test type");
}
};

/**
* @brief Functor to generate a tdigest.
*/
struct tdigest_gen {
template <
typename T,
Expand All @@ -97,9 +75,15 @@ struct tdigest_gen {
}
};

/**
* @brief Compare a tdigest column against a sampling of expected values.
*/
void tdigest_sample_compare(cudf::tdigest::tdigest_column_view const& tdv,
std::vector<expected_value> const& h_expected);

/**
* @brief Compare the min/max values of a tdigest against inputs.
*/
template <typename T>
void tdigest_minmax_compare(cudf::tdigest::tdigest_column_view const& tdv,
column_view const& input_values)
Expand Down Expand Up @@ -136,11 +120,14 @@ struct expected_tdigest {
double min, max;
};

/**
* @brief Create an expected tdigest column given component inputs.
*/
std::unique_ptr<column> make_expected_tdigest_column(std::vector<expected_tdigest> const& groups);

// shared tests for groupby/reduction.
// shared test for groupby/reduction.
template <typename T, typename Func>
void simple_tdigest_aggregation(Func op)
void tdigest_simple_aggregation(Func op)
{
// create a tdigest that has far fewer values in it than the delta value. this should result
// in every value remaining uncompressed
Expand All @@ -162,8 +149,9 @@ void simple_tdigest_aggregation(Func op)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}

// shared test for groupby/reduction.
template <typename T, typename Func>
void simple_with_null_tdigest_aggregation(Func op)
void tdigest_simple_with_nulls_aggregation(Func op)
{
// create a tdigest that has far fewer values in it than the delta value. this should result
// in every value remaining uncompressed
Expand All @@ -186,8 +174,9 @@ void simple_with_null_tdigest_aggregation(Func op)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}

// shared test for groupby/reduction.
template <typename T, typename Func>
void simple_all_null_tdigest_aggregation(Func op)
void tdigest_simple_all_nulls_aggregation(Func op)
{
// create a tdigest that has far fewer values in it than the delta value. this should result
// in every value remaining uncompressed
Expand All @@ -203,8 +192,9 @@ void simple_all_null_tdigest_aggregation(Func op)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}

// shared test for groupby/reduction.
template <typename Func>
void simple_large_input_double_tdigest_aggregation(Func op)
void tdigest_simple_large_input_double_aggregation(Func op)
{
// these tests are being done explicitly because of the way we have to precompute the correct
// answers. since the input values generated by the generate_distribution() function below are
Expand Down Expand Up @@ -276,8 +266,9 @@ void simple_large_input_double_tdigest_aggregation(Func op)
}
}

// shared test for groupby/reduction.
template <typename Func>
void simple_large_input_int_tdigest_aggregation(Func op)
void tdigest_simple_large_input_int_aggregation(Func op)
{
// these tests are being done explicitly because of the way we have to precompute the correct
// answers. since the input values generated by the generate_distribution() function below are
Expand Down Expand Up @@ -352,8 +343,9 @@ void simple_large_input_int_tdigest_aggregation(Func op)
}
}

// shared test for groupby/reduction.
template <typename Func>
void simple_large_input_decimal_tdigest_aggregation(Func op)
void tdigest_simple_large_input_decimal_aggregation(Func op)
{
// these tests are being done explicitly because of the way we have to precompute the correct
// answers. since the input values generated by the generate_distribution() function below are
Expand Down Expand Up @@ -426,5 +418,82 @@ void simple_large_input_decimal_tdigest_aggregation(Func op)
}
}

// Note: there is no need to test different types here as the internals of a tdigest are always
// the same regardless of input.
template <typename Func, typename MergeFunc>
void tdigest_merge_simple(Func op, MergeFunc merge_op)
{
auto values = generate_standardized_percentile_distribution(data_type{type_id::FLOAT64});
CUDF_EXPECTS(values->size() == 750000, "Unexpected distribution size");

auto split_values = cudf::split(*values, {250000, 500000});

int const delta = 1000;

// generate seperate digests
std::vector<std::unique_ptr<column>> parts;
auto iter = thrust::make_counting_iterator(0);
std::transform(
iter, iter + split_values.size(), std::back_inserter(parts), [&split_values, delta, op](int i) {
return op(split_values[i], delta);
});
std::vector<column_view> part_views;
std::transform(parts.begin(),
parts.end(),
std::back_inserter(part_views),
[](std::unique_ptr<column> const& col) { return col->view(); });

// merge delta = 1000
{
int const merge_delta = 1000;

// merge them
auto merge_input = cudf::concatenate(part_views);
auto result = merge_op(*merge_input, merge_delta);
cudf::tdigest::tdigest_column_view tdv(*result);

// verify centroids
std::vector<expected_value> expected{{0, 0.00013945158577498588, 2},
{10, 0.04804393446447510763, 50},
{59, 1.68846964439246893797, 284},
{250, 33.36323141295877547918, 1479},
{368, 65.36307727957283475462, 2292},
{409, 73.95399208218296394080, 1784},
{490, 87.67566167909056673579, 1570},
{491, 87.83119717763385381204, 1570},
{500, 89.24891838334393412424, 1555},
{578, 95.87182997389099625707, 583},
{625, 98.20470345147104751504, 405},
{700, 99.96818381983835877236, 56},
{711, 99.99970905482754801596, 1}};
tdigest_sample_compare(tdv, expected);

// verify min/max
tdigest_minmax_compare<double>(tdv, *values);
}
}

// shared test for groupby/reduction.
template <typename MergeFunc>
void tdigest_merge_empty(MergeFunc merge_op)
{
// 3 empty tdigests all in the same group
auto a = cudf::detail::tdigest::make_empty_tdigest_column();
auto b = cudf::detail::tdigest::make_empty_tdigest_column();
auto c = cudf::detail::tdigest::make_empty_tdigest_column();
std::vector<column_view> cols;
cols.push_back(*a);
cols.push_back(*b);
cols.push_back(*c);
auto values = cudf::concatenate(cols);

auto const delta = 1000;
auto result = merge_op(*values, delta);

auto expected = cudf::detail::tdigest::make_empty_tdigest_column();

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result);
}

} // namespace test
} // namespace cudf
2 changes: 0 additions & 2 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <quantiles/tdigest/tdigest.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <memory>
Expand Down
Loading

0 comments on commit 6a2d50e

Please sign in to comment.