From ba763105e006494a536c1a2fafc5112ab3dae362 Mon Sep 17 00:00:00 2001
From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com>
Date: Fri, 24 Sep 2021 09:37:11 -0500
Subject: [PATCH] Support for using tdigests to compute approximate
percentiles. (#8983)
Addresses https://github.com/rapidsai/cudf/issues/7170
Adds 3 pieces of new functionality:
- A `TDIGEST` aggregation which creates a tdigest column (https://arxiv.org/pdf/1902.04023.pdf) from a stream of input scalars.
- A `MERGE_TDIGEST` aggregation which merges multiple tdigest columns into a new one.
- a `percentile_approx` function which performs percentile queries on tdigest data.
Also exposes several ::detail functions (`sort`, `merge`, `slice`) in detail headers.
Ready for review. I do need to add more tests though.
Authors:
- https://github.com/nvdbaranec
Approvers:
- AJ Schmidt (https://github.com/ajschmidt8)
- Jake Hemstad (https://github.com/jrhemstad)
- MithunR (https://github.com/mythrocks)
- Robert Maynard (https://github.com/robertmaynard)
URL: https://github.com/rapidsai/cudf/pull/8983
---
conda/recipes/libcudf/meta.yaml | 1 +
cpp/CMakeLists.txt | 4 +-
cpp/include/cudf/aggregation.hpp | 79 +-
.../cudf/detail/aggregation/aggregation.hpp | 76 ++
cpp/include/cudf/detail/copy.hpp | 9 +
cpp/include/cudf/detail/merge.cuh | 17 +
cpp/include/cudf/detail/quantiles.hpp | 18 +-
cpp/include/cudf/detail/sorting.hpp | 16 +-
cpp/include/cudf/detail/tdigest/tdigest.hpp | 79 ++
cpp/include/cudf/quantiles.hpp | 28 +
cpp/include/cudf/sorting.hpp | 6 +-
cpp/include/cudf_test/column_utilities.hpp | 7 +-
cpp/src/aggregation/aggregation.cpp | 41 +
cpp/src/copying/slice.cu | 34 +-
cpp/src/groupby/sort/aggregate.cpp | 91 ++
cpp/src/groupby/sort/group_reductions.hpp | 88 ++
cpp/src/groupby/sort/group_tdigest.cu | 841 ++++++++++++++++++
cpp/src/quantiles/tdigest/tdigest.cu | 383 ++++++++
cpp/src/sort/sort.cu | 8 +-
cpp/src/sort/stable_sort.cu | 4 +-
cpp/tests/CMakeLists.txt | 2 +
cpp/tests/groupby/groupby_test_util.hpp | 55 ++
cpp/tests/groupby/tdigest_tests.cu | 584 ++++++++++++
cpp/tests/quantiles/percentile_approx_test.cu | 435 +++++++++
cpp/tests/utilities/column_utilities.cu | 61 +-
25 files changed, 2919 insertions(+), 48 deletions(-)
create mode 100644 cpp/include/cudf/detail/tdigest/tdigest.hpp
create mode 100644 cpp/src/groupby/sort/group_tdigest.cu
create mode 100644 cpp/src/quantiles/tdigest/tdigest.cu
create mode 100644 cpp/tests/groupby/tdigest_tests.cu
create mode 100644 cpp/tests/quantiles/percentile_approx_test.cu
diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml
index c3450fe8d88..fd687de6698 100644
--- a/conda/recipes/libcudf/meta.yaml
+++ b/conda/recipes/libcudf/meta.yaml
@@ -93,6 +93,7 @@ test:
- test -f $PREFIX/include/cudf/detail/sequence.hpp
- test -f $PREFIX/include/cudf/detail/sorting.hpp
- test -f $PREFIX/include/cudf/detail/stream_compaction.hpp
+ - test -f $PREFIX/include/cudf/detail/tdigest/tdigest.hpp
- test -f $PREFIX/include/cudf/detail/transform.hpp
- test -f $PREFIX/include/cudf/detail/transpose.hpp
- test -f $PREFIX/include/cudf/detail/unary.hpp
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 2df35aa0971..00af1973cfe 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -236,8 +236,9 @@ add_library(cudf
src/groupby/sort/group_max_scan.cu
src/groupby/sort/group_min_scan.cu
src/groupby/sort/group_rank_scan.cu
- src/groupby/sort/group_sum_scan.cu
src/groupby/sort/group_replace_nulls.cu
+ src/groupby/sort/group_sum_scan.cu
+ src/groupby/sort/group_tdigest.cu
src/groupby/sort/sort_helper.cu
src/hash/hashing.cu
src/hash/md5_hash.cu
@@ -318,6 +319,7 @@ add_library(cudf
src/merge/merge.cu
src/partitioning/partitioning.cu
src/partitioning/round_robin.cu
+ src/quantiles/tdigest/tdigest.cu
src/quantiles/quantile.cu
src/quantiles/quantiles.cu
src/reductions/all.cu
diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp
index c302895880d..fb6401a3cc1 100644
--- a/cpp/include/cudf/aggregation.hpp
+++ b/cpp/include/cudf/aggregation.hpp
@@ -87,7 +87,9 @@ class aggregation {
CUDA, ///< CUDA UDF based reduction
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
- MERGE_M2 ///< merge partial values of M2 aggregation
+ MERGE_M2, ///< merge partial values of M2 aggregation
+ TDIGEST, ///< create a tdigest from a set of input values
+ MERGE_TDIGEST ///< create a tdigest by merging multiple tdigests together
};
aggregation() = delete;
@@ -493,5 +495,80 @@ std::unique_ptr make_merge_sets_aggregation(null_equality nulls_equal = nu
template
std::unique_ptr make_merge_m2_aggregation();
+/**
+ * @brief Factory to create a TDIGEST aggregation
+ *
+ * Produces a tdigest (https://arxiv.org/pdf/1902.04023.pdf) column from input values.
+ * The input aggregation values are expected to be fixed-width numeric types.
+ *
+ * The tdigest column produced is of the following structure:
+ *
+ * struct {
+ * // centroids for the digest
+ * list {
+ * struct {
+ * double // mean
+ * double // weight
+ * },
+ * ...
+ * }
+ * // these are from the input stream, not the centroids. they are used
+ * // during the percentile_approx computation near the beginning or
+ * // end of the quantiles
+ * double // min
+ * double // max
+ * }
+ *
+ * Each output row is a single tdigest. The length of the row is the "size" of the
+ * tdigest, each element of which represents a weighted centroid (mean, weight).
+ *
+ * @param max_centroids Parameter controlling compression level and accuracy on subsequent
+ * queries on the output tdigest data. `max_centroids` places an upper bound on the size of
+ * the computed tdigests: A value of 1000 will result in a tdigest containing no
+ * more than 1000 centroids (32 bytes each). Higher result in more accurate tdigest information.
+ *
+ * @returns A TDIGEST aggregation object.
+ */
+template
+std::unique_ptr make_tdigest_aggregation(int max_centroids = 1000);
+
+/**
+ * @brief Factory to create a MERGE_TDIGEST aggregation
+ *
+ * Merges the results from a previous aggregation resulting from a `make_tdigest_aggregation`
+ * or `make_merge_tdigest_aggregation` to produce a new a tdigest
+ * (https://arxiv.org/pdf/1902.04023.pdf) column.
+ *
+ * The tdigest column produced is of the following structure:
+ *
+ * struct {
+ * // centroids for the digest
+ * list {
+ * struct {
+ * double // mean
+ * double // weight
+ * },
+ * ...
+ * }
+ * // these are from the input stream, not the centroids. they are used
+ * // during the percentile_approx computation near the beginning or
+ * // end of the quantiles
+ * double // min
+ * double // max
+ * }
+ *
+ * Each output row is a single tdigest. The length of the row is the "size" of the
+ * tdigest, each element of which represents a weighted centroid (mean, weight).
+ *
+ * @param max_centroids Parameter controlling compression level and accuracy on subsequent
+ * queries on the output tdigest data. `max_centroids` places an upper bound on the size of
+ * the computed tdigests: A value of 1000 will result in a tdigest containing no
+ * more than 1000 centroids (32 bytes each). Higher result in more accurate tdigest information.
+ *
+ * @returns A MERGE_TDIGEST aggregation object.
+ */
+template
+std::unique_ptr make_merge_tdigest_aggregation(int max_centroids = 1000);
+
/** @} */ // end of group
} // namespace cudf
diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp
index 5a1fc3b9398..05d1bf3e595 100644
--- a/cpp/include/cudf/detail/aggregation/aggregation.hpp
+++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp
@@ -91,6 +91,10 @@ class simple_aggregations_collector { // Declares the interface for the simple
class merge_sets_aggregation const& agg);
virtual std::vector> visit(data_type col_type,
class merge_m2_aggregation const& agg);
+ virtual std::vector> visit(data_type col_type,
+ class tdigest_aggregation const& agg);
+ virtual std::vector> visit(
+ data_type col_type, class merge_tdigest_aggregation const& agg);
};
class aggregation_finalizer { // Declares the interface for the finalizer
@@ -125,6 +129,8 @@ class aggregation_finalizer { // Declares the interface for the finalizer
virtual void visit(class merge_lists_aggregation const& agg);
virtual void visit(class merge_sets_aggregation const& agg);
virtual void visit(class merge_m2_aggregation const& agg);
+ virtual void visit(class tdigest_aggregation const& agg);
+ virtual void visit(class merge_tdigest_aggregation const& agg);
};
/**
@@ -884,6 +890,54 @@ class merge_m2_aggregation final : public groupby_aggregation {
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};
+/**
+ * @brief Derived aggregation class for specifying TDIGEST aggregation
+ */
+class tdigest_aggregation final : public groupby_aggregation {
+ public:
+ explicit tdigest_aggregation(int max_centroids_)
+ : aggregation{TDIGEST}, max_centroids{max_centroids_}
+ {
+ }
+
+ int const max_centroids;
+
+ std::unique_ptr clone() const override
+ {
+ return std::make_unique(*this);
+ }
+ std::vector> get_simple_aggregations(
+ data_type col_type, simple_aggregations_collector& collector) const override
+ {
+ return collector.visit(col_type, *this);
+ }
+ void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
+};
+
+/**
+ * @brief Derived aggregation class for specifying MERGE_TDIGEST aggregation
+ */
+class merge_tdigest_aggregation final : public groupby_aggregation {
+ public:
+ explicit merge_tdigest_aggregation(int max_centroids_)
+ : aggregation{MERGE_TDIGEST}, max_centroids{max_centroids_}
+ {
+ }
+
+ int const max_centroids;
+
+ std::unique_ptr clone() const override
+ {
+ return std::make_unique(*this);
+ }
+ std::vector> get_simple_aggregations(
+ data_type col_type, simple_aggregations_collector& collector) const override
+ {
+ return collector.visit(col_type, *this);
+ }
+ void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
+};
+
/**
* @brief Sentinel value used for `ARGMAX` aggregation.
*
@@ -1120,6 +1174,24 @@ struct target_type_impl {
using type = struct_view;
};
+// Always use numeric types for TDIGEST
+template
+struct target_type_impl() || is_fixed_point())>> {
+ using type = struct_view;
+};
+
+// TDIGEST_MERGE. The root column type for a tdigest column is a list_view. Strictly
+// speaking, this check is not sufficient to guarantee we are actually being given a
+// real tdigest column, but we will do further verification inside the aggregation code.
+template
+struct target_type_impl>> {
+ using type = struct_view;
+};
+
/**
* @brief Helper alias to get the accumulator type for performing aggregation
* `k` on elements of type `Source`
@@ -1224,6 +1296,10 @@ CUDA_HOST_DEVICE_CALLABLE decltype(auto) aggregation_dispatcher(aggregation::Kin
return f.template operator()(std::forward(args)...);
case aggregation::MERGE_M2:
return f.template operator()(std::forward(args)...);
+ case aggregation::TDIGEST:
+ return f.template operator()(std::forward(args)...);
+ case aggregation::MERGE_TDIGEST:
+ return f.template operator()(std::forward(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
diff --git a/cpp/include/cudf/detail/copy.hpp b/cpp/include/cudf/detail/copy.hpp
index fb5cfad6186..9f06661c8d1 100644
--- a/cpp/include/cudf/detail/copy.hpp
+++ b/cpp/include/cudf/detail/copy.hpp
@@ -75,6 +75,15 @@ std::vector slice(column_view const& input,
std::vector const& indices,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);
+/**
+ * @copydoc cudf::slice(table_view const&,std::vector const&)
+ *
+ * @param stream CUDA stream used for device memory operations and kernel launches.
+ */
+std::vector slice(table_view const& input,
+ std::vector const& indices,
+ rmm::cuda_stream_view stream = rmm::cuda_stream_default);
+
/**
* @copydoc cudf::shift(column_view const&,size_type,scalar const&,
* rmm::mr::device_memory_resource*)
diff --git a/cpp/include/cudf/detail/merge.cuh b/cpp/include/cudf/detail/merge.cuh
index a779c3defbb..ec83e348e33 100644
--- a/cpp/include/cudf/detail/merge.cuh
+++ b/cpp/include/cudf/detail/merge.cuh
@@ -145,5 +145,22 @@ struct row_lexicographic_tagged_comparator {
order const* _column_order{};
};
+/**
+ * @copydoc std::unique_ptr merge(
+ * std::vector const& tables_to_merge,
+ * std::vector const& key_cols,
+ * std::vector const& column_order,
+ * std::vector const& null_precedence,
+ * rmm::mr::device_memory_resource* mr)
+ *
+ * @param stream CUDA stream used for device memory operations and kernel launches
+ */
+std::unique_ptr merge(std::vector const& tables_to_merge,
+ std::vector const& key_cols,
+ std::vector const& column_order,
+ std::vector const& null_precedence,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr);
+
} // namespace detail
} // namespace cudf
diff --git a/cpp/include/cudf/detail/quantiles.hpp b/cpp/include/cudf/detail/quantiles.hpp
index 5fb2ce4cbe6..7a76f9cab88 100644
--- a/cpp/include/cudf/detail/quantiles.hpp
+++ b/cpp/include/cudf/detail/quantiles.hpp
@@ -22,7 +22,8 @@
namespace cudf {
namespace detail {
-/** @copydoc cudf::quantile()
+/**
+ * @copydoc cudf::quantile()
*
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
@@ -35,7 +36,8 @@ std::unique_ptr quantile(
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
-/** @copydoc cudf::quantiles()
+/**
+ * @copydoc cudf::quantiles()
*
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
@@ -49,5 +51,17 @@ std::unique_ptr quantiles(
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
+/**
+ * @copydoc cudf::percentile_approx(column_view const&, column_view const&,
+ * rmm::mr::device_memory_resource*)
+ *
+ * @param stream CUDA stream used for device memory operations and kernel launches.
+ */
+std::unique_ptr percentile_approx(
+ column_view const& input,
+ column_view const& percentiles,
+ rmm::cuda_stream_view stream = rmm::cuda_stream_default,
+ rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
+
} // namespace detail
} // namespace cudf
diff --git a/cpp/include/cudf/detail/sorting.hpp b/cpp/include/cudf/detail/sorting.hpp
index 3127a5f89f1..b5dfb34c043 100644
--- a/cpp/include/cudf/detail/sorting.hpp
+++ b/cpp/include/cudf/detail/sorting.hpp
@@ -32,7 +32,7 @@ namespace detail {
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr sorted_order(
- table_view input,
+ table_view const& input,
std::vector const& column_order = {},
std::vector const& null_precedence = {},
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
@@ -44,7 +44,7 @@ std::unique_ptr sorted_order(
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr stable_sorted_order(
- table_view input,
+ table_view const& input,
std::vector const& column_order = {},
std::vector const& null_precedence = {},
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
@@ -90,5 +90,17 @@ std::unique_ptr segmented_sort_by_key(
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
+/**
+ * @copydoc cudf::sort
+ *
+ * @param stream CUDA stream used for device memory operations and kernel launches.
+ */
+std::unique_ptr sort(
+ table_view const& values,
+ std::vector const& column_order = {},
+ std::vector const& null_precedence = {},
+ rmm::cuda_stream_view stream = rmm::cuda_stream_default,
+ rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
+
} // namespace detail
} // namespace cudf
diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp
new file mode 100644
index 00000000000..94c22911c1e
--- /dev/null
+++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2021, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+
+#include
+
+namespace cudf {
+namespace detail {
+
+namespace tdigest {
+
+// mean and weight column indices within tdigest inner struct columns
+constexpr size_type mean_column_index = 0;
+constexpr size_type weight_column_index = 1;
+
+// min and max column indices within tdigest outer struct columns
+constexpr size_type centroid_column_index = 0;
+constexpr size_type min_column_index = 1;
+constexpr size_type max_column_index = 2;
+
+/**
+ * @brief Verifies that the input column is a valid tdigest column.
+ *
+ * struct {
+ * // centroids for the digest
+ * list {
+ * struct {
+ * double // mean
+ * double // weight
+ * },
+ * ...
+ * }
+ * // these are from the input stream, not the centroids. they are used
+ * // during the percentile_approx computation near the beginning or
+ * // end of the quantiles
+ * double // min
+ * double // max
+ * }
+ *
+ * Each output row is a single tdigest. The length of the row is the "size" of the
+ * tdigest, each element of which represents a weighted centroid (mean, weight).
+ *
+ * @param col Column to be checkeed
+ *
+ * @throws cudf::logic error if the column is not a valid tdigest column.
+ */
+void check_is_valid_tdigest_column(column_view const& col);
+
+/**
+ * @brief Create an empty tdigest column.
+ *
+ * An empty tdigest column contains a single row of length 0
+ *
+ * @param stream CUDA stream used for device memory operations and kernel launches.
+ * @param mr Device memory resource used to allocate the returned column's device memory.
+ *
+ * @returns An empty tdigest column.
+ */
+std::unique_ptr make_empty_tdigest_column(
+ rmm::cuda_stream_view stream = rmm::cuda_stream_default,
+ rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
+
+} // namespace tdigest
+} // namespace detail
+} // namespace cudf
\ No newline at end of file
diff --git a/cpp/include/cudf/quantiles.hpp b/cpp/include/cudf/quantiles.hpp
index 94b5c344f4f..d21f6dff79c 100644
--- a/cpp/include/cudf/quantiles.hpp
+++ b/cpp/include/cudf/quantiles.hpp
@@ -17,6 +17,7 @@
#pragma once
#include
+#include
#include
#include
@@ -94,5 +95,32 @@ std::unique_ptr quantiles(
std::vector const& null_precedence = {},
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
+/**
+ * @brief Calculate approximate percentiles on an input tdigest column.
+ *
+ * tdigest (https://arxiv.org/pdf/1902.04023.pdf) columns are produced specifically
+ * by the TDIGEST and MERGE_TDIGEST aggregations. These columns represent
+ * compressed representations of a very large input data set that can be
+ * queried for quantile information.
+ *
+ * Produces a LIST column where each row `i` represents output from querying the
+ * corresponding tdigest from `input` row `i`. The length of each output list
+ * is the number of percentages specified in `percentages`.
+ *
+ * @param input tdigest input data. One tdigest per row.
+ * @param percentiles Desired percentiles in range [0, 1].
+ * @param mr Device memory resource used to allocate the returned column's device
+ * memory
+ *
+ * @throws cudf::logic_error if `input` is not a valid tdigest column.
+ * @throws cudf::logic_error if `percentiles` is not a FLOAT64 column.
+ *
+ * @returns LIST Column containing requested percentile values as FLOAT64.
+ */
+std::unique_ptr percentile_approx(
+ structs_column_view const& input,
+ column_view const& percentiles,
+ rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
+
/** @} */ // end of group
} // namespace cudf
diff --git a/cpp/include/cudf/sorting.hpp b/cpp/include/cudf/sorting.hpp
index 36a8131a78e..69eb8b3490a 100644
--- a/cpp/include/cudf/sorting.hpp
+++ b/cpp/include/cudf/sorting.hpp
@@ -58,7 +58,7 @@ enum class rank_method {
* `input` if it were sorted
*/
std::unique_ptr sorted_order(
- table_view input,
+ table_view const& input,
std::vector const& column_order = {},
std::vector const& null_precedence = {},
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
@@ -72,7 +72,7 @@ std::unique_ptr sorted_order(
* @copydoc cudf::sorted_order
*/
std::unique_ptr stable_sorted_order(
- table_view input,
+ table_view const& input,
std::vector const& column_order = {},
std::vector const& null_precedence = {},
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
@@ -112,7 +112,7 @@ bool is_sorted(cudf::table_view const& table,
* @return New table containing the desired sorted order of `input`
*/
std::unique_ptr sort(
- table_view input,
+ table_view const& input,
std::vector const& column_order = {},
std::vector const& null_precedence = {},
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
diff --git a/cpp/include/cudf_test/column_utilities.hpp b/cpp/include/cudf_test/column_utilities.hpp
index 553d8a97bd2..aa77686fee4 100644
--- a/cpp/include/cudf_test/column_utilities.hpp
+++ b/cpp/include/cudf_test/column_utilities.hpp
@@ -38,6 +38,8 @@ enum class debug_output_level {
QUIET // no debug output
};
+constexpr size_type default_ulp = 4;
+
/**
* @brief Verifies the property equality of two columns.
*
@@ -93,12 +95,15 @@ bool expect_columns_equal(cudf::column_view const& lhs,
* @param lhs The first column
* @param rhs The second column
* @param verbosity Level of debug output verbosity
+ * @param fp_ulps # of ulps of tolerance to allow when comparing
+ * floating point values
*
* @returns True if the columns (and their properties) are equivalent, false otherwise
*/
bool expect_columns_equivalent(cudf::column_view const& lhs,
cudf::column_view const& rhs,
- debug_output_level verbosity = debug_output_level::FIRST_ERROR);
+ debug_output_level verbosity = debug_output_level::FIRST_ERROR,
+ size_type fp_ulps = cudf::test::default_ulp);
/**
* @brief Verifies the bitwise equality of two device memory buffers.
diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp
index c3d992e1181..b550b61785b 100644
--- a/cpp/src/aggregation/aggregation.cpp
+++ b/cpp/src/aggregation/aggregation.cpp
@@ -202,6 +202,18 @@ std::vector> simple_aggregations_collector::visit(
return visit(col_type, static_cast(agg));
}
+std::vector> simple_aggregations_collector::visit(
+ data_type col_type, tdigest_aggregation const& agg)
+{
+ return visit(col_type, static_cast(agg));
+}
+
+std::vector> simple_aggregations_collector::visit(
+ data_type col_type, merge_tdigest_aggregation const& agg)
+{
+ return visit(col_type, static_cast(agg));
+}
+
// aggregation_finalizer ----------------------------------------
void aggregation_finalizer::visit(aggregation const& agg) {}
@@ -346,6 +358,16 @@ void aggregation_finalizer::visit(merge_m2_aggregation const& agg)
visit(static_cast(agg));
}
+void aggregation_finalizer::visit(tdigest_aggregation const& agg)
+{
+ visit(static_cast(agg));
+}
+
+void aggregation_finalizer::visit(merge_tdigest_aggregation const& agg)
+{
+ visit(static_cast(agg));
+}
+
} // namespace detail
std::vector> aggregation::get_simple_aggregations(
@@ -668,6 +690,25 @@ std::unique_ptr make_merge_m2_aggregation()
template std::unique_ptr make_merge_m2_aggregation();
template std::unique_ptr make_merge_m2_aggregation();
+template
+std::unique_ptr make_tdigest_aggregation(int max_centroids)
+{
+ return std::make_unique(max_centroids);
+}
+template std::unique_ptr make_tdigest_aggregation(int max_centroids);
+template std::unique_ptr make_tdigest_aggregation(
+ int max_centroids);
+
+template
+std::unique_ptr make_merge_tdigest_aggregation(int max_centroids)
+{
+ return std::make_unique(max_centroids);
+}
+template std::unique_ptr make_merge_tdigest_aggregation(
+ int max_centroids);
+template std::unique_ptr make_merge_tdigest_aggregation(
+ int max_centroids);
+
namespace detail {
namespace {
struct target_type_functor {
diff --git a/cpp/src/copying/slice.cu b/cpp/src/copying/slice.cu
index 0e41689dc4b..d1c12056393 100644
--- a/cpp/src/copying/slice.cu
+++ b/cpp/src/copying/slice.cu
@@ -63,17 +63,9 @@ std::vector slice(column_view const& input,
return std::vector{begin, begin + indices.size() / 2};
}
-} // namespace detail
-
-std::vector slice(cudf::column_view const& input,
- std::vector const& indices)
-{
- CUDF_FUNC_RANGE();
- return detail::slice(input, indices, rmm::cuda_stream_default);
-}
-
-std::vector slice(cudf::table_view const& input,
- std::vector const& indices)
+std::vector slice(table_view const& input,
+ std::vector const& indices,
+ rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(indices.size() % 2 == 0, "indices size must be even");
@@ -81,7 +73,7 @@ std::vector slice(cudf::table_view const& input,
// 2d arrangement of column_views that represent the outgoing table_views sliced_table[i][j]
// where i is the i'th column of the j'th table_view
- auto op = [&indices](auto const& c) { return cudf::slice(c, indices); };
+ auto op = [&indices, stream](auto const& c) { return cudf::detail::slice(c, indices, stream); };
auto f = thrust::make_transform_iterator(input.begin(), op);
auto sliced_table = std::vector>(f, f + input.num_columns());
@@ -99,6 +91,22 @@ std::vector slice(cudf::table_view const& input,
}
return result;
-};
+}
+
+} // namespace detail
+
+std::vector slice(cudf::column_view const& input,
+ std::vector const& indices)
+{
+ CUDF_FUNC_RANGE();
+ return detail::slice(input, indices, rmm::cuda_stream_default);
+}
+
+std::vector slice(cudf::table_view const& input,
+ std::vector const& indices)
+{
+ CUDF_FUNC_RANGE();
+ return detail::slice(input, indices, rmm::cuda_stream_default);
+}
} // namespace cudf
diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp
index 726b51b7702..9f3d67ac38b 100644
--- a/cpp/src/groupby/sort/aggregate.cpp
+++ b/cpp/src/groupby/sort/aggregate.cpp
@@ -525,6 +525,97 @@ void aggregate_result_functor::operator()(aggregation con
get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr));
};
+/**
+ * @brief Generate a tdigest column from a grouped set of numeric input values.
+ *
+ * The tdigest column produced is of the following structure:
+ *
+ * struct {
+ * // centroids for the digest
+ * list {
+ * struct {
+ * double // mean
+ * double // weight
+ * },
+ * ...
+ * }
+ * // these are from the input stream, not the centroids. they are used
+ * // during the percentile_approx computation near the beginning or
+ * // end of the quantiles
+ * double // min
+ * double // max
+ * }
+ *
+ * Each output row is a single tdigest. The length of the row is the "size" of the
+ * tdigest, each element of which represents a weighted centroid (mean, weight).
+ */
+template <>
+void aggregate_result_functor::operator()(aggregation const& agg)
+{
+ if (cache.has_result(col_idx, agg)) { return; }
+
+ auto const max_centroids =
+ dynamic_cast(agg).max_centroids;
+
+ auto count_agg = make_count_aggregation();
+ operator()(*count_agg);
+ column_view valid_counts = cache.get_result(col_idx, *count_agg);
+
+ cache.add_result(col_idx,
+ agg,
+ detail::group_tdigest(
+ get_sorted_values(),
+ helper.group_offsets(stream),
+ helper.group_labels(stream),
+ {valid_counts.begin(), static_cast(valid_counts.size())},
+ helper.num_groups(stream),
+ max_centroids,
+ stream,
+ mr));
+};
+
+/**
+ * @brief Generate a merged tdigest column from a grouped set of input tdigest columns.
+ *
+ * The tdigest column produced is of the following structure:
+ *
+ * struct {
+ * // centroids for the digest
+ * list {
+ * struct {
+ * double // mean
+ * double // weight
+ * },
+ * ...
+ * }
+ * // these are from the input stream, not the centroids. they are used
+ * // during the percentile_approx computation near the beginning or
+ * // end of the quantiles
+ * double // min
+ * double // max
+ * }
+ *
+ * Each output row is a single tdigest. The length of the row is the "size" of the
+ * tdigest, each element of which represents a weighted centroid (mean, weight).
+ */
+template <>
+void aggregate_result_functor::operator()(aggregation const& agg)
+{
+ if (cache.has_result(col_idx, agg)) { return; }
+
+ auto const max_centroids =
+ dynamic_cast(agg).max_centroids;
+ cache.add_result(col_idx,
+ agg,
+ detail::group_merge_tdigest(get_grouped_values(),
+ helper.group_offsets(stream),
+ helper.group_labels(stream),
+ helper.num_groups(stream),
+ max_centroids,
+ stream,
+ mr));
+};
+
} // namespace detail
// Sort-based groupby
diff --git a/cpp/src/groupby/sort/group_reductions.hpp b/cpp/src/groupby/sort/group_reductions.hpp
index 2770162da2d..cb01ee8e053 100644
--- a/cpp/src/groupby/sort/group_reductions.hpp
+++ b/cpp/src/groupby/sort/group_reductions.hpp
@@ -442,6 +442,94 @@ std::unique_ptr group_merge_m2(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
+/**
+ * @brief Generate a tdigest column from a grouped set of numeric input values.
+ *
+ * The tdigest column produced is of the following structure:
+ *
+ * struct {
+ * // centroids for the digest
+ * list {
+ * struct {
+ * double // mean
+ * double // weight
+ * },
+ * ...
+ * }
+ * // these are from the input stream, not the centroids. they are used
+ * // during the percentile_approx computation near the beginning or
+ * // end of the quantiles
+ * double // min
+ * double // max
+ * }
+ *
+ * Each output row is a single tdigest. The length of the row is the "size" of the
+ * tdigest, each element of which represents a weighted centroid (mean, weight).
+ *
+ * @param values Grouped (and sorted) values to merge.
+ * @param group_offsets Offsets of groups' starting points within @p values.
+ * @param group_labels 0-based ID of group that the corresponding value belongs to
+ * @param group_valid_counts Per-group counts of valid elements.
+ * @param num_groups Number of groups.
+ * @param max_centroids Parameter controlling the level of compression of the tdigest. Higher
+ * values result in a larger, more precise tdigest.
+ * @param stream CUDA stream used for device memory operations and kernel launches.
+ * @param mr Device memory resource used to allocate the returned column's device memory
+ *
+ * @returns tdigest column, with 1 tdigest per row
+ */
+std::unique_ptr group_tdigest(column_view const& values,
+ cudf::device_span group_offsets,
+ cudf::device_span group_labels,
+ cudf::device_span group_valid_counts,
+ size_type num_groups,
+ int max_centroids,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr);
+
+/**
+ * @brief Merges tdigests within the same group to generate a new tdigest.
+ *
+ * The tdigest column produced is of the following structure:
+ *
+ * struct {
+ * // centroids for the digest
+ * list {
+ * struct {
+ * double // mean
+ * double // weight
+ * },
+ * ...
+ * }
+ * // these are from the input stream, not the centroids. they are used
+ * // during the percentile_approx computation near the beginning or
+ * // end of the quantiles
+ * double // min
+ * double // max
+ * }
+ *
+ * Each output row is a single tdigest. The length of the row is the "size" of the
+ * tdigest, each element of which represents a weighted centroid (mean, weight).
+ *
+ * @param values Grouped tdigests to merge.
+ * @param group_offsets Offsets of groups' starting points within @p values.
+ * @param group_labels 0-based ID of group that the corresponding value belongs to
+ * @param num_groups Number of groups.
+ * @param max_centroids Parameter controlling the level of compression of the tdigest. Higher
+ * values result in a larger, more precise tdigest.
+ * @param stream CUDA stream used for device memory operations and kernel launches.
+ * @param mr Device memory resource used to allocate the returned column's device memory
+ *
+ * @returns tdigest column, with 1 tdigest per row
+ */
+std::unique_ptr group_merge_tdigest(column_view const& values,
+ cudf::device_span group_offsets,
+ cudf::device_span group_labels,
+ size_type num_groups,
+ int max_centroids,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr);
+
/** @endinternal
*
*/
diff --git a/cpp/src/groupby/sort/group_tdigest.cu b/cpp/src/groupby/sort/group_tdigest.cu
new file mode 100644
index 00000000000..5b4252a9063
--- /dev/null
+++ b/cpp/src/groupby/sort/group_tdigest.cu
@@ -0,0 +1,841 @@
+/*
+ * Copyright (c) 2021, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+#include
+#include
+
+#include
+#include
+
+namespace cudf {
+namespace groupby {
+namespace detail {
+
+namespace {
+
+// the most representative point within a cluster of similar
+// values. {mean, weight}
+// NOTE: Using a tuple here instead of a struct to take advantage of
+// thrust zip iterators for output.
+using centroid = thrust::tuple;
+
+// make a centroid from a scalar with a weight of 1.
+template
+struct make_centroid {
+ column_device_view const col;
+
+ centroid operator() __device__(size_type index)
+ {
+ return {static_cast(col.element(index)), 1, col.is_valid(index)};
+ }
+};
+
+// make a centroid from an input stream of mean/weight values.
+struct make_weighted_centroid {
+ double const* mean;
+ double const* weight;
+
+ centroid operator() __device__(size_type index) { return {mean[index], weight[index], true}; }
+};
+
+// merge two centroids
+struct merge_centroids {
+ centroid operator() __device__(centroid const& lhs, centroid const& rhs)
+ {
+ bool const lhs_valid = thrust::get<2>(lhs);
+ bool const rhs_valid = thrust::get<2>(rhs);
+ if (!lhs_valid && !rhs_valid) { return {0, 0, false}; }
+ if (!lhs_valid) { return rhs; }
+ if (!rhs_valid) { return lhs; }
+
+ double const lhs_mean = thrust::get<0>(lhs);
+ double const rhs_mean = thrust::get<0>(rhs);
+ double const lhs_weight = thrust::get<1>(lhs);
+ double const rhs_weight = thrust::get<1>(rhs);
+ double const new_weight = lhs_weight + rhs_weight;
+ return {(lhs_mean * lhs_weight + rhs_mean * rhs_weight) / new_weight, new_weight, true};
+ }
+};
+
+/**
+ * @brief A functor which returns the nearest cumulative weight in the input stream prior to the
+ * specified next weight limit.
+ *
+ * This functor assumes the weight for all scalars is simply 1. Under this assumption,
+ * the nearest weight that will be <= the next limit is simply the nearest integer < the limit,
+ * which we can get by just taking floor(next_limit). For example if our next limit is 3.56, the
+ * nearest whole number <= it is floor(3.56) == 3.
+ */
+struct nearest_value_scalar_weights {
+ thrust::pair operator() __device__(double next_limit, size_type)
+ {
+ double const f = floor(next_limit);
+ return {f, max(0, static_cast(next_limit) - 1)};
+ }
+};
+
+/**
+ * @brief A functor which returns the nearest cumulative weight in the input stream prior to the
+ * specified next weight limit.
+ *
+ * This functor assumes we are dealing with grouped, sorted, weighted centroids.
+ */
+struct nearest_value_centroid_weights {
+ double const* cumulative_weights;
+ offset_type const* outer_offsets; // groups
+ offset_type const* inner_offsets; // tdigests within a group
+
+ thrust::pair operator() __device__(double next_limit, size_type group_index)
+ {
+ auto const tdigest_begin = outer_offsets[group_index];
+ auto const tdigest_end = outer_offsets[group_index + 1];
+ auto const num_weights = inner_offsets[tdigest_end] - inner_offsets[tdigest_begin];
+ double const* group_cumulative_weights = cumulative_weights + inner_offsets[tdigest_begin];
+
+ auto const index = ((thrust::lower_bound(thrust::seq,
+ group_cumulative_weights,
+ group_cumulative_weights + num_weights,
+ next_limit)) -
+ group_cumulative_weights);
+
+ return index == 0 ? thrust::pair{0, 0}
+ : thrust::pair{group_cumulative_weights[index - 1], index - 1};
+ }
+};
+
+/**
+ * @brief A functor which returns the cumulative input weight for a given index in a
+ * set of grouped input values.
+ *
+ * This functor assumes the weight for all scalars is simply 1. Under this assumption,
+ * the cumulative weight for a given value index I is simply I+1.
+ */
+struct cumulative_scalar_weight {
+ cudf::device_span group_offsets;
+ cudf::device_span group_labels;
+ std::tuple operator() __device__(size_type value_index) const
+ {
+ auto const group_index = group_labels[value_index];
+ auto const relative_value_index = value_index - group_offsets[group_index];
+ return {group_index, relative_value_index, relative_value_index + 1};
+ }
+};
+
+/**
+ * @brief A functor which returns the cumulative input weight for a given index in a
+ * set of grouped input centroids.
+ *
+ * This functor assumes we are dealing with grouped, weighted centroids.
+ */
+struct cumulative_centroid_weight {
+ double const* cumulative_weights;
+ cudf::device_span group_labels;
+ offset_type const* outer_offsets; // groups
+ cudf::device_span inner_offsets; // tdigests with a group
+
+ std::tuple operator() __device__(size_type value_index) const
+ {
+ auto const tdigest_index =
+ static_cast(
+ thrust::upper_bound(thrust::seq, inner_offsets.begin(), inner_offsets.end(), value_index) -
+ inner_offsets.begin()) -
+ 1;
+ auto const group_index = group_labels[tdigest_index];
+ auto const first_tdigest_index = outer_offsets[group_index];
+ auto const first_weight_index = inner_offsets[first_tdigest_index];
+ auto const relative_value_index = value_index - first_weight_index;
+ double const* group_cumulative_weights = cumulative_weights + first_weight_index;
+
+ return {group_index, relative_value_index, group_cumulative_weights[relative_value_index]};
+ }
+};
+
+// a monotonically increasing scale function which produces a distribution
+// of centroids that is more densely packed in the middle of the input
+// than at the ends.
+__device__ double scale_func_k1(double quantile, double delta_norm)
+{
+ double k = delta_norm * asin(2.0 * quantile - 1.0);
+ k += 1.0;
+ double q = (sin(k / delta_norm) + 1.0) / 2.0;
+ return q;
+}
+
+/**
+ * @brief Compute a set of cluster limits (brackets, essentially) for a
+ * given tdigest based on the specified delta and the total weight of values
+ * to be added.
+ *
+ * The number of clusters generated will always be <= delta_, where delta_ is
+ * a reasonably small number likely << 10000.
+ *
+ * Each input group gets an independent set of clusters generated. 1 thread
+ * per group.
+ *
+ * This kernel is called in a two-pass style. Once to compute the per-group
+ * cluster sizes and total # of clusters, and once to compute the actual
+ * weight limits per cluster.
+ *
+ * @param delta_ tdigest compression level
+ * @param num_groups The number of input groups
+ * @param nearest_weight_ A functor which returns the nearest weight in the input
+ * stream that falls before our current cluster limit
+ * @param total_weight_ A functor which returns the expected total weight for
+ * the entire stream of input values for the specified group.
+ * @param group_cluster_wl Output. The set of cluster weight limits for each group.
+ * @param group_num_clusters Output. The number of output clusters for each input group.
+ * @param group_cluster_offsets Offsets per-group to the start of it's clusters
+ *
+ */
+template
+__global__ void generate_cluster_limits_kernel(int delta_,
+ size_type num_groups,
+ NearestWeightFunc nearest_weight,
+ TotalWeightIter total_weight_,
+ CumulativeWeight cumulative_weight,
+ double* group_cluster_wl,
+ size_type* group_num_clusters,
+ offset_type const* group_cluster_offsets)
+{
+ int const tid = threadIdx.x + blockIdx.x * blockDim.x;
+ auto const group_index = tid;
+ if (group_index >= num_groups) { return; }
+
+ // we will generate at most delta clusters.
+ double const delta = static_cast(delta_);
+ double const delta_norm = delta / (2.0 * M_PI);
+ double const total_weight = total_weight_[group_index];
+ group_num_clusters[group_index] = 0;
+ // a group with nothing in it.
+ if (total_weight <= 0) { return; }
+
+ // start at the correct place based on our cluster offset.
+ double* cluster_wl =
+ group_cluster_wl ? group_cluster_wl + group_cluster_offsets[group_index] : nullptr;
+
+ double cur_limit = 0.0;
+ double cur_weight = 0.0;
+ double next_limit = -1.0;
+ int last_inserted_index = -1;
+
+ // compute the first cluster limit
+ double nearest_w;
+ int nearest_w_index;
+ while (1) {
+ cur_weight = next_limit < 0 ? 0 : max(cur_weight + 1, nearest_w);
+ if (cur_weight >= total_weight) { break; }
+
+ // based on where we are closing the cluster off (not including the incoming weight),
+ // compute the next cluster limit
+ double const quantile = cur_weight / total_weight;
+ next_limit = total_weight * scale_func_k1(quantile, delta_norm);
+
+ // if the next limit is < the cur limit, we're past the end of the distribution, so we're done.
+ if (next_limit <= cur_limit) {
+ if (cluster_wl) { cluster_wl[group_num_clusters[group_index]] = total_weight; }
+ group_num_clusters[group_index]++;
+ break;
+ }
+
+ // compute the weight we will be at in the input values just before closing off the current
+ // cluster (because adding the next value will cross the current limit).
+ // NOTE: can't use structured bindings here.
+ thrust::tie(nearest_w, nearest_w_index) = nearest_weight(next_limit, group_index);
+
+ if (cluster_wl) {
+ // because of the way the scale functions work, it is possible to generate clusters
+ // in such a way that we end up with "gaps" where there are no input values that
+ // fall into a given cluster. An example would be this:
+ //
+ // cluster weight limits = 0.00003, 1.008, 3.008
+ //
+ // input values(weight) = A(1), B(2), C(3)
+ //
+ // naively inserting these values into the clusters simply by taking a lower_bound,
+ // we would get the following distribution of input values into those 3 clusters.
+ // (), (A), (B,C)
+ //
+ // whereas what we really want is:
+ //
+ // (A), (B), (C)
+ //
+ // to fix this, we will artificially adjust the output cluster limits to guarantee
+ // at least 1 input value will be put in each cluster during the reduction step.
+ // this does not affect final centroid results as we still use the "real" weight limits
+ // to compute subsequent clusters - the purpose is only to allow cluster selection
+ // during the reduction step to be trivial.
+ //
+ double adjusted_next_limit = next_limit;
+ if (nearest_w_index == last_inserted_index || last_inserted_index < 0) {
+ nearest_w_index = last_inserted_index + 1;
+ auto [r, i, adjusted] = cumulative_weight(nearest_w_index);
+ adjusted_next_limit = max(next_limit, adjusted);
+ }
+ cluster_wl[group_num_clusters[group_index]] = adjusted_next_limit;
+ last_inserted_index = nearest_w_index;
+ }
+ group_num_clusters[group_index]++;
+ cur_limit = next_limit;
+ }
+}
+
+/**
+ * @brief Compute a set of cluster limits (brackets, essentially) for a
+ * given tdigest based on the specified delta and the total weight of values
+ * to be added.
+ *
+ * The number of clusters generated will always be <= delta_, where delta_ is
+ * a reasonably small number likely << 10000.
+ *
+ * Each input group gets an independent set of clusters generated.
+ *
+ * @param delta_ tdigest compression level
+ * @param num_groups The number of input groups
+ * @param nearest_weight A functor which returns the nearest weight in the input
+ * stream that falls before our current cluster limit
+ * @param total_weight A functor which returns the expected total weight for
+ * the entire stream of input values for the specified group.
+ * @param stream CUDA stream used for device memory operations and kernel launches.
+ * @param mr Device memory resource used to allocate the returned column's device memory
+ *
+ * @returns A tuple containing the set of cluster weight limits for each group, a set of
+ * list-style offsets indicating group sizes, and the total number of clusters
+ */
+template
+std::tuple, std::unique_ptr, size_type>
+generate_group_cluster_info(int delta,
+ size_type num_groups,
+ NearestWeight nearest_weight,
+ TotalWeightIter total_weight,
+ CumulativeWeight cumulative_weight,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr)
+{
+ constexpr size_type block_size = 256;
+ cudf::detail::grid_1d const grid(num_groups, block_size);
+
+ // compute number of clusters per group
+ // each thread computes 1 set of clusters (# of cluster sets == # of groups)
+ rmm::device_uvector group_num_clusters(num_groups, stream);
+ generate_cluster_limits_kernel<<>>(
+ delta,
+ num_groups,
+ nearest_weight,
+ total_weight,
+ cumulative_weight,
+ nullptr,
+ group_num_clusters.begin(),
+ nullptr);
+
+ // generate group cluster offsets (where the clusters for a given group start and end)
+ auto group_cluster_offsets = cudf::make_fixed_width_column(
+ data_type{type_id::INT32}, num_groups + 1, mask_state::UNALLOCATED, stream, mr);
+ auto cluster_size = cudf::detail::make_counting_transform_iterator(
+ 0, [group_num_clusters = group_num_clusters.begin(), num_groups] __device__(size_type index) {
+ return index == num_groups ? 0 : group_num_clusters[index];
+ });
+ thrust::exclusive_scan(rmm::exec_policy(stream),
+ cluster_size,
+ cluster_size + num_groups + 1,
+ group_cluster_offsets->mutable_view().begin(),
+ 0);
+
+ // total # of clusters
+ offset_type total_clusters =
+ cudf::detail::get_value(group_cluster_offsets->view(), num_groups, stream);
+
+ // fill in the actual cluster weight limits
+ rmm::device_uvector group_cluster_wl(total_clusters, stream);
+ generate_cluster_limits_kernel<<>>(
+ delta,
+ num_groups,
+ nearest_weight,
+ total_weight,
+ cumulative_weight,
+ group_cluster_wl.begin(),
+ group_num_clusters.begin(),
+ group_cluster_offsets->view().begin());
+
+ return {std::move(group_cluster_wl),
+ std::move(group_cluster_offsets),
+ static_cast(total_clusters)};
+}
+
+/**
+ * @brief Compute a column of tdigests.
+ *
+ * Assembles the output tdigest column based on the specified delta, a stream of
+ * input values (either scalar or centroids), and an assortment of per-group
+ * clustering information.
+ *
+ * This function is effectively just a reduce_by_key that performs a reduction
+ * from input values -> centroid clusters as defined by the the cluster weight
+ * boundaries.
+ *
+ * @param delta tdigest compression level
+ * @param values_begin Beginning of the range of input values.
+ * @param values_end End of the range of input values.
+ * @param cumulative_weight Functor which returns cumulative weight and group information for
+ * an absolute input value index.
+ * @param min_col Column containing the minimum value per group.
+ * @param max_col Column containing the maximum value per group.
+ * @param group_cluster_wl Cluster weight limits for each group.
+ * @param group_cluster_offsets R-value reference of offsets into the cluster weight limits.
+ * @param total_clusters Total number of clusters in all groups.
+ * @param stream CUDA stream used for device memory operations and kernel launches.
+ * @param mr Device memory resource used to allocate the returned column's device memory
+ *
+ * @returns A tdigest column with 1 row per output tdigest.
+ */
+template
+std::unique_ptr compute_tdigests(int delta,
+ CentroidIter centroids_begin,
+ CentroidIter centroids_end,
+ CumulativeWeight group_cumulative_weight,
+ std::unique_ptr&& min_col,
+ std::unique_ptr&& max_col,
+ rmm::device_uvector const& group_cluster_wl,
+ std::unique_ptr&& group_cluster_offsets,
+ size_type total_clusters,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr)
+{
+ // the output for each group is column of data that represents the tdigest. since we want 1 row
+ // per group, each row will be a list the length of the tdigest for that group. so our output
+ // column is of the form:
+ // struct {
+ // centroids for the digest
+ // list {
+ // struct {
+ // double // mean
+ // double // weight
+ // }
+ // }
+ // double // min
+ // double // max
+ // }
+ //
+ //
+ if (total_clusters == 0) { return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr); }
+ std::vector> inner_children;
+ // mean
+ inner_children.push_back(cudf::make_fixed_width_column(
+ data_type{type_id::FLOAT64}, total_clusters, mask_state::UNALLOCATED, stream, mr));
+ // weight
+ inner_children.push_back(cudf::make_fixed_width_column(
+ data_type{type_id::FLOAT64}, total_clusters, mask_state::UNALLOCATED, stream, mr));
+ // tdigest struct
+ auto tdigests =
+ cudf::make_structs_column(total_clusters, std::move(inner_children), 0, {}, stream, mr);
+
+ // each input group represents an individual tdigest. within each tdigest, we want the keys
+ // to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall
+ // into the range 0-99). But since we have multiple tdigests, we need to keep the keys unique
+ // between the groups, so we add our group start offset.
+ auto keys = thrust::make_transform_iterator(
+ thrust::make_counting_iterator(0),
+ [delta,
+ group_cluster_wl = group_cluster_wl.data(),
+ group_cluster_offsets = group_cluster_offsets->view().begin(),
+ group_cumulative_weight] __device__(size_type value_index) -> size_type {
+ auto [group_index, relative_value_index, cumulative_weight] =
+ group_cumulative_weight(value_index);
+
+ // compute start of cluster weight limits for this group
+ double const* weight_limits = group_cluster_wl + group_cluster_offsets[group_index];
+ auto const num_clusters =
+ group_cluster_offsets[group_index + 1] - group_cluster_offsets[group_index];
+
+ // local cluster index
+ size_type const group_cluster_index =
+ min(num_clusters - 1,
+ static_cast(
+ thrust::lower_bound(
+ thrust::seq, weight_limits, weight_limits + num_clusters, cumulative_weight) -
+ weight_limits));
+
+ // add the cluster offset to generate a globally unique key
+ return group_cluster_index + group_cluster_offsets[group_index];
+ });
+
+ // reduce the centroids down by key.
+ cudf::mutable_column_view mean_col =
+ tdigests->child(cudf::detail::tdigest::mean_column_index).mutable_view();
+ cudf::mutable_column_view weight_col =
+ tdigests->child(cudf::detail::tdigest::weight_column_index).mutable_view();
+ auto output = thrust::make_zip_iterator(thrust::make_tuple(
+ mean_col.begin(), weight_col.begin(), thrust::make_discard_iterator()));
+ auto const num_values = std::distance(centroids_begin, centroids_end);
+ thrust::reduce_by_key(rmm::exec_policy(stream),
+ keys,
+ keys + num_values, // keys
+ centroids_begin, // values
+ thrust::make_discard_iterator(), // key output
+ output, // output
+ thrust::equal_to{}, // key equality check
+ merge_centroids{});
+
+ // create the list
+ auto const num_groups = group_cluster_offsets->size() - 1;
+ auto list = cudf::make_lists_column(
+ num_groups, std::move(group_cluster_offsets), std::move(tdigests), 0, {});
+
+ // create final tdigest column
+ std::vector> children;
+ children.push_back(std::move(list));
+ children.push_back(std::move(min_col));
+ children.push_back(std::move(max_col));
+ return make_structs_column(num_groups, std::move(children), 0, {}, stream, mr);
+}
+
+// retrieve total weight of scalar inputs by group index
+struct scalar_total_weight {
+ size_type const* group_valid_counts;
+ __device__ double operator()(size_type group_index) { return group_valid_counts[group_index]; }
+};
+
+// return the min/max value of scalar inputs by group index
+template
+struct get_scalar_minmax {
+ column_device_view const col;
+ device_span group_offsets;
+ size_type const* group_valid_counts;
+
+ __device__ thrust::tuple operator()(size_type group_index)
+ {
+ // note: .element() is taking care of fixed-point conversions for us.
+ return {static_cast(col.element(group_offsets[group_index])),
+ static_cast(
+ col.element(group_offsets[group_index] + (group_valid_counts[group_index] - 1)))};
+ }
+};
+
+struct typed_group_tdigest {
+ template <
+ typename T,
+ typename std::enable_if_t() || cudf::is_fixed_point()>* = nullptr>
+ std::unique_ptr operator()(column_view const& col,
+ cudf::device_span group_offsets,
+ cudf::device_span group_labels,
+ cudf::device_span group_valid_counts,
+ size_type num_groups,
+ int delta,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr)
+ {
+ // first, generate cluster weight information for each input group
+ auto total_weight = cudf::detail::make_counting_transform_iterator(
+ 0, scalar_total_weight{group_valid_counts.begin()});
+ auto [group_cluster_wl, group_cluster_offsets, total_clusters] =
+ generate_group_cluster_info(delta,
+ num_groups,
+ nearest_value_scalar_weights{},
+ total_weight,
+ cumulative_scalar_weight{group_offsets, group_labels},
+ stream,
+ mr);
+
+ // device column view. handy because the .element() function
+ // automatically handles fixed-point conversions for us
+ auto d_col = cudf::column_device_view::create(col);
+
+ // compute min and max columns
+ auto min_col = cudf::make_fixed_width_column(
+ data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr);
+ auto max_col = cudf::make_fixed_width_column(
+ data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr);
+ thrust::transform(
+ rmm::exec_policy(stream),
+ thrust::make_counting_iterator(0),
+ thrust::make_counting_iterator(0) + num_groups,
+ thrust::make_zip_iterator(thrust::make_tuple(min_col->mutable_view().begin(),
+ max_col->mutable_view().begin())),
+ get_scalar_minmax{*d_col, group_offsets, group_valid_counts.begin()});
+
+ // for simple input values, the "centroids" all have a weight of 1.
+ auto scalar_to_centroid =
+ cudf::detail::make_counting_transform_iterator(0, make_centroid{*d_col});
+
+ // generate the final tdigest
+ return compute_tdigests(delta,
+ scalar_to_centroid,
+ scalar_to_centroid + col.size(),
+ cumulative_scalar_weight{group_offsets, group_labels},
+ std::move(min_col),
+ std::move(max_col),
+ group_cluster_wl,
+ std::move(group_cluster_offsets),
+ total_clusters,
+ stream,
+ mr);
+ }
+
+ template <
+ typename T,
+ typename std::enable_if_t() && !cudf::is_fixed_point()>* = nullptr>
+ std::unique_ptr operator()(column_view const& col,
+ cudf::device_span group_offsets,
+ cudf::device_span group_labels,
+ cudf::device_span group_valid_counts,
+ size_type num_groups,
+ int delta,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr)
+ {
+ CUDF_FAIL("Non-numeric type in group_tdigest");
+ }
+};
+
+} // anonymous namespace
+
+std::unique_ptr group_tdigest(column_view const& col,
+ cudf::device_span group_offsets,
+ cudf::device_span group_labels,
+ cudf::device_span group_valid_counts,
+ size_type num_groups,
+ int max_centroids,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr)
+{
+ if (col.size() == 0) { return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr); }
+
+ auto const delta = max_centroids;
+ return cudf::type_dispatcher(col.type(),
+ typed_group_tdigest{},
+ col,
+ group_offsets,
+ group_labels,
+ group_valid_counts,
+ num_groups,
+ delta,
+ stream,
+ mr);
+}
+
+std::unique_ptr group_merge_tdigest(column_view const& input,
+ cudf::device_span group_offsets,
+ cudf::device_span group_labels,
+ size_type num_groups,
+ int max_centroids,
+ rmm::cuda_stream_view stream,
+ rmm::mr::device_memory_resource* mr)
+{
+ cudf::detail::tdigest::check_is_valid_tdigest_column(input);
+
+ if (num_groups == 0 || input.size() == 0) {
+ return cudf::detail::tdigest::make_empty_tdigest_column(stream, mr);
+ }
+
+ structs_column_view scv(input);
+ lists_column_view lcv(scv.child(cudf::detail::tdigest::centroid_column_index));
+ // ideally, we would just call .parent().child() here because tdigests cannot be
+ // sliced. however, lists_column_view() hides that particular interface. However,
+ // for the same reason, get_sliced_child() should be just as cheap.
+ auto data = lcv.get_sliced_child(stream);
+ structs_column_view tdigest(data);
+ auto mean = tdigest.child(cudf::detail::tdigest::mean_column_index);
+ auto weight = tdigest.child(cudf::detail::tdigest::weight_column_index);
+
+ // first step is to merge all the tdigests in each group. at the moment the only way to
+ // make this work is to retrieve the group sizes (via group_offsets) and the individual digest
+ // sizes (via input.offsets()) to the gpu and do the merges. The scale problem is that while the
+ // size of each group will likely be small (size of each group will typically map to # of batches
+ // the input data was chopped into for tdigest generation), the -number- of groups can be
+ // arbitrarily large.
+ //
+ // thrust::merge and thrust::merge_by_key don't provide what we need. What we would need is an
+ // algorithm like a super-merge that takes two layers of keys: one which identifies the outer
+ // grouping of tdigests, and one which identifies the inner groupings of the tdigests within the
+ // outer groups.
+
+ // bring group offsets back to the host
+ std::vector h_outer_offsets(group_offsets.size());
+ cudaMemcpyAsync(h_outer_offsets.data(),
+ group_offsets.data(),
+ sizeof(size_type) * group_offsets.size(),
+ cudaMemcpyDeviceToHost,
+ stream);
+
+ // bring tdigest offsets back to the host
+ auto tdigest_offsets = lcv.offsets();
+ std::vector h_inner_offsets(tdigest_offsets.size());
+ cudaMemcpyAsync(h_inner_offsets.data(),
+ tdigest_offsets.begin(),
+ sizeof(size_type) * tdigest_offsets.size(),
+ cudaMemcpyDeviceToHost,
+ stream);
+
+ stream.synchronize();
+
+ // extract all means and weights into a table
+ cudf::table_view tdigests_unsliced({mean, weight});
+
+ // generate the merged (but not yet compressed) tdigests for each group.
+ std::vector> tdigests;
+ tdigests.reserve(num_groups);
+ std::transform(
+ h_outer_offsets.begin(),
+ h_outer_offsets.end() - 1,
+ std::next(h_outer_offsets.begin()),
+ std::back_inserter(tdigests),
+ [&](auto tdigest_start, auto tdigest_end) {
+ // the range of tdigests in this group
+ auto const num_tdigests = tdigest_end - tdigest_start;
+
+ // slice each tdigest from the input
+ std::vector unmerged_tdigests;
+ unmerged_tdigests.reserve(num_tdigests);
+ auto offset_iter = std::next(h_inner_offsets.begin(), tdigest_start);
+ std::transform(offset_iter,
+ offset_iter + num_tdigests,
+ std::next(offset_iter),
+ std::back_inserter(unmerged_tdigests),
+ [&](auto start, auto end) {
+ return cudf::detail::slice(tdigests_unsliced, {start, end}, stream);
+ });
+
+ // merge
+ return cudf::detail::merge(unmerged_tdigests, {0}, {order::ASCENDING}, {}, stream, mr);
+ });
+
+ // generate min and max values
+ auto min_col = scv.child(cudf::detail::tdigest::min_column_index);
+ auto merged_min_col = cudf::make_fixed_width_column(
+ data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr);
+ thrust::reduce_by_key(rmm::exec_policy(stream),
+ group_labels.begin(),
+ group_labels.end(),
+ min_col.begin(),
+ thrust::make_discard_iterator(),
+ merged_min_col->mutable_view().begin(),
+ thrust::equal_to{}, // key equality check
+ thrust::minimum{});
+
+ auto max_col = scv.child(cudf::detail::tdigest::max_column_index);
+ auto merged_max_col = cudf::make_fixed_width_column(
+ data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr);
+ thrust::reduce_by_key(rmm::exec_policy(stream),
+ group_labels.begin(),
+ group_labels.end(),
+ max_col.begin(),
+ thrust::make_discard_iterator(),
+ merged_max_col->mutable_view().begin(),
+ thrust::equal_to{}, // key equality check
+ thrust::maximum{});
+
+ // concatenate all the merged tdigests back into one table.
+ std::vector tdigest_views;
+ tdigest_views.reserve(num_groups);
+ std::transform(tdigests.begin(),
+ tdigests.end(),
+ std::back_inserter(tdigest_views),
+ [](std::unique_ptr const& t) { return t->view(); });
+ auto merged = cudf::detail::concatenate(tdigest_views, stream, mr);
+
+ // generate cumulative weights
+ auto merged_weights = merged->get_column(cudf::detail::tdigest::weight_column_index).view();
+ auto cumulative_weights = cudf::make_fixed_width_column(
+ data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED);
+ auto keys = cudf::detail::make_counting_transform_iterator(
+ 0,
+ [group_labels = group_labels.begin(),
+ inner_offsets = tdigest_offsets.begin(),
+ num_inner_offsets = tdigest_offsets.size()] __device__(int index) {
+ // what -original- tdigest index this absolute index corresponds to
+ auto const iter = thrust::prev(
+ thrust::upper_bound(thrust::seq, inner_offsets, inner_offsets + num_inner_offsets, index));
+ auto const tdigest_index = thrust::distance(inner_offsets, iter);
+
+ // what group index the original tdigest belongs to
+ return group_labels[tdigest_index];
+ });
+ thrust::inclusive_scan_by_key(rmm::exec_policy(stream),
+ keys,
+ keys + cumulative_weights->size(),
+ merged_weights.begin(),
+ cumulative_weights->mutable_view().begin());
+
+ auto const delta = max_centroids;
+
+ // generate cluster info
+ auto total_group_weight = cudf::detail::make_counting_transform_iterator(
+ 0,
+ [outer_offsets = group_offsets.data(),
+ inner_offsets = tdigest_offsets.begin(),
+ cumulative_weights =
+ cumulative_weights->view().begin()] __device__(size_type group_index) {
+ auto const last_weight_index = inner_offsets[outer_offsets[group_index + 1]] - 1;
+ return cumulative_weights[last_weight_index];
+ });
+ auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
+ delta,
+ num_groups,
+ nearest_value_centroid_weights{cumulative_weights->view().begin(),
+ group_offsets.data(),
+ tdigest_offsets.begin()},
+ total_group_weight,
+ cumulative_centroid_weight{
+ cumulative_weights->view().begin(),
+ group_labels,
+ group_offsets.data(),
+ {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}},
+ stream,
+ mr);
+
+ // input centroid values
+ auto centroids = cudf::detail::make_counting_transform_iterator(
+ 0,
+ make_weighted_centroid{
+ merged->get_column(cudf::detail::tdigest::mean_column_index).view().begin(),
+ merged_weights.begin()});
+
+ // compute the tdigest
+ return compute_tdigests(delta,
+ centroids,
+ centroids + merged->num_rows(),
+ cumulative_centroid_weight{cumulative_weights->view().begin(),
+ group_labels,
+ group_offsets.data(),
+ {tdigest_offsets.begin(),
+ static_cast(tdigest_offsets.size())}},
+ std::move(merged_min_col),
+ std::move(merged_max_col),
+ group_cluster_wl,
+ std::move(group_cluster_offsets),
+ total_clusters,
+ stream,
+ mr);
+}
+
+} // namespace detail
+} // namespace groupby
+} // namespace cudf
diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu
new file mode 100644
index 00000000000..9aea59a195b
--- /dev/null
+++ b/cpp/src/quantiles/tdigest/tdigest.cu
@@ -0,0 +1,383 @@
+/*
+ * Copyright (c) 2021, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include