Skip to content

Commit

Permalink
Support for using tdigests to compute approximate percentiles. (#8983)
Browse files Browse the repository at this point in the history
Addresses  #7170

Adds 3 pieces of new functionality:

- A `TDIGEST` aggregation which creates a tdigest column (https://arxiv.org/pdf/1902.04023.pdf) from a stream of input scalars.
- A `MERGE_TDIGEST` aggregation which merges multiple tdigest columns into a new one.
- a `percentile_approx` function which performs percentile queries on tdigest data.

Also exposes several ::detail functions (`sort`, `merge`, `slice`) in detail headers.

Ready for review.  I do need to add more tests though.

Authors:
  - https://github.com/nvdbaranec

Approvers:
  - AJ Schmidt (https://github.com/ajschmidt8)
  - Jake Hemstad (https://github.com/jrhemstad)
  - MithunR (https://github.com/mythrocks)
  - Robert Maynard (https://github.com/robertmaynard)

URL: #8983
  • Loading branch information
nvdbaranec authored Sep 24, 2021
1 parent ad76ed1 commit ba76310
Show file tree
Hide file tree
Showing 25 changed files with 2,919 additions and 48 deletions.
1 change: 1 addition & 0 deletions conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ test:
- test -f $PREFIX/include/cudf/detail/sequence.hpp
- test -f $PREFIX/include/cudf/detail/sorting.hpp
- test -f $PREFIX/include/cudf/detail/stream_compaction.hpp
- test -f $PREFIX/include/cudf/detail/tdigest/tdigest.hpp
- test -f $PREFIX/include/cudf/detail/transform.hpp
- test -f $PREFIX/include/cudf/detail/transpose.hpp
- test -f $PREFIX/include/cudf/detail/unary.hpp
Expand Down
4 changes: 3 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ add_library(cudf
src/groupby/sort/group_max_scan.cu
src/groupby/sort/group_min_scan.cu
src/groupby/sort/group_rank_scan.cu
src/groupby/sort/group_sum_scan.cu
src/groupby/sort/group_replace_nulls.cu
src/groupby/sort/group_sum_scan.cu
src/groupby/sort/group_tdigest.cu
src/groupby/sort/sort_helper.cu
src/hash/hashing.cu
src/hash/md5_hash.cu
Expand Down Expand Up @@ -318,6 +319,7 @@ add_library(cudf
src/merge/merge.cu
src/partitioning/partitioning.cu
src/partitioning/round_robin.cu
src/quantiles/tdigest/tdigest.cu
src/quantiles/quantile.cu
src/quantiles/quantiles.cu
src/reductions/all.cu
Expand Down
79 changes: 78 additions & 1 deletion cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ class aggregation {
CUDA, ///< CUDA UDF based reduction
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2 ///< merge partial values of M2 aggregation
MERGE_M2, ///< merge partial values of M2 aggregation
TDIGEST, ///< create a tdigest from a set of input values
MERGE_TDIGEST ///< create a tdigest by merging multiple tdigests together
};

aggregation() = delete;
Expand Down Expand Up @@ -493,5 +495,80 @@ std::unique_ptr<Base> make_merge_sets_aggregation(null_equality nulls_equal = nu
template <typename Base = aggregation>
std::unique_ptr<Base> make_merge_m2_aggregation();

/**
* @brief Factory to create a TDIGEST aggregation
*
* Produces a tdigest (https://arxiv.org/pdf/1902.04023.pdf) column from input values.
* The input aggregation values are expected to be fixed-width numeric types.
*
* The tdigest column produced is of the following structure:
*
* struct {
* // centroids for the digest
* list {
* struct {
* double // mean
* double // weight
* },
* ...
* }
* // these are from the input stream, not the centroids. they are used
* // during the percentile_approx computation near the beginning or
* // end of the quantiles
* double // min
* double // max
* }
*
* Each output row is a single tdigest. The length of the row is the "size" of the
* tdigest, each element of which represents a weighted centroid (mean, weight).
*
* @param max_centroids Parameter controlling compression level and accuracy on subsequent
* queries on the output tdigest data. `max_centroids` places an upper bound on the size of
* the computed tdigests: A value of 1000 will result in a tdigest containing no
* more than 1000 centroids (32 bytes each). Higher result in more accurate tdigest information.
*
* @returns A TDIGEST aggregation object.
*/
template <typename Base>
std::unique_ptr<Base> make_tdigest_aggregation(int max_centroids = 1000);

/**
* @brief Factory to create a MERGE_TDIGEST aggregation
*
* Merges the results from a previous aggregation resulting from a `make_tdigest_aggregation`
* or `make_merge_tdigest_aggregation` to produce a new a tdigest
* (https://arxiv.org/pdf/1902.04023.pdf) column.
*
* The tdigest column produced is of the following structure:
*
* struct {
* // centroids for the digest
* list {
* struct {
* double // mean
* double // weight
* },
* ...
* }
* // these are from the input stream, not the centroids. they are used
* // during the percentile_approx computation near the beginning or
* // end of the quantiles
* double // min
* double // max
* }
*
* Each output row is a single tdigest. The length of the row is the "size" of the
* tdigest, each element of which represents a weighted centroid (mean, weight).
*
* @param max_centroids Parameter controlling compression level and accuracy on subsequent
* queries on the output tdigest data. `max_centroids` places an upper bound on the size of
* the computed tdigests: A value of 1000 will result in a tdigest containing no
* more than 1000 centroids (32 bytes each). Higher result in more accurate tdigest information.
*
* @returns A MERGE_TDIGEST aggregation object.
*/
template <typename Base>
std::unique_ptr<Base> make_merge_tdigest_aggregation(int max_centroids = 1000);

/** @} */ // end of group
} // namespace cudf
76 changes: 76 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class simple_aggregations_collector { // Declares the interface for the simple
class merge_sets_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(data_type col_type,
class merge_m2_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(data_type col_type,
class tdigest_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
data_type col_type, class merge_tdigest_aggregation const& agg);
};

class aggregation_finalizer { // Declares the interface for the finalizer
Expand Down Expand Up @@ -125,6 +129,8 @@ class aggregation_finalizer { // Declares the interface for the finalizer
virtual void visit(class merge_lists_aggregation const& agg);
virtual void visit(class merge_sets_aggregation const& agg);
virtual void visit(class merge_m2_aggregation const& agg);
virtual void visit(class tdigest_aggregation const& agg);
virtual void visit(class merge_tdigest_aggregation const& agg);
};

/**
Expand Down Expand Up @@ -884,6 +890,54 @@ class merge_m2_aggregation final : public groupby_aggregation {
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived aggregation class for specifying TDIGEST aggregation
*/
class tdigest_aggregation final : public groupby_aggregation {
public:
explicit tdigest_aggregation(int max_centroids_)
: aggregation{TDIGEST}, max_centroids{max_centroids_}
{
}

int const max_centroids;

std::unique_ptr<aggregation> clone() const override
{
return std::make_unique<tdigest_aggregation>(*this);
}
std::vector<std::unique_ptr<aggregation>> get_simple_aggregations(
data_type col_type, simple_aggregations_collector& collector) const override
{
return collector.visit(col_type, *this);
}
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived aggregation class for specifying MERGE_TDIGEST aggregation
*/
class merge_tdigest_aggregation final : public groupby_aggregation {
public:
explicit merge_tdigest_aggregation(int max_centroids_)
: aggregation{MERGE_TDIGEST}, max_centroids{max_centroids_}
{
}

int const max_centroids;

std::unique_ptr<aggregation> clone() const override
{
return std::make_unique<merge_tdigest_aggregation>(*this);
}
std::vector<std::unique_ptr<aggregation>> get_simple_aggregations(
data_type col_type, simple_aggregations_collector& collector) const override
{
return collector.visit(col_type, *this);
}
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Sentinel value used for `ARGMAX` aggregation.
*
Expand Down Expand Up @@ -1120,6 +1174,24 @@ struct target_type_impl<SourceType, aggregation::MERGE_M2> {
using type = struct_view;
};

// Always use numeric types for TDIGEST
template <typename Source>
struct target_type_impl<Source,
aggregation::TDIGEST,
std::enable_if_t<(is_numeric<Source>() || is_fixed_point<Source>())>> {
using type = struct_view;
};

// TDIGEST_MERGE. The root column type for a tdigest column is a list_view. Strictly
// speaking, this check is not sufficient to guarantee we are actually being given a
// real tdigest column, but we will do further verification inside the aggregation code.
template <typename Source>
struct target_type_impl<Source,
aggregation::MERGE_TDIGEST,
std::enable_if_t<std::is_same_v<Source, cudf::struct_view>>> {
using type = struct_view;
};

/**
* @brief Helper alias to get the accumulator type for performing aggregation
* `k` on elements of type `Source`
Expand Down Expand Up @@ -1224,6 +1296,10 @@ CUDA_HOST_DEVICE_CALLABLE decltype(auto) aggregation_dispatcher(aggregation::Kin
return f.template operator()<aggregation::MERGE_SETS>(std::forward<Ts>(args)...);
case aggregation::MERGE_M2:
return f.template operator()<aggregation::MERGE_M2>(std::forward<Ts>(args)...);
case aggregation::TDIGEST:
return f.template operator()<aggregation::TDIGEST>(std::forward<Ts>(args)...);
case aggregation::MERGE_TDIGEST:
return f.template operator()<aggregation::MERGE_TDIGEST>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
9 changes: 9 additions & 0 deletions cpp/include/cudf/detail/copy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ std::vector<column_view> slice(column_view const& input,
std::vector<size_type> const& indices,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @copydoc cudf::slice(table_view const&,std::vector<size_type> const&)
*
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
std::vector<table_view> slice(table_view const& input,
std::vector<size_type> const& indices,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @copydoc cudf::shift(column_view const&,size_type,scalar const&,
* rmm::mr::device_memory_resource*)
Expand Down
17 changes: 17 additions & 0 deletions cpp/include/cudf/detail/merge.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,22 @@ struct row_lexicographic_tagged_comparator {
order const* _column_order{};
};

/**
* @copydoc std::unique_ptr<cudf::table> merge(
* std::vector<table_view> const& tables_to_merge,
* std::vector<cudf::size_type> const& key_cols,
* std::vector<cudf::order> const& column_order,
* std::vector<cudf::null_order> const& null_precedence,
* rmm::mr::device_memory_resource* mr)
*
* @param stream CUDA stream used for device memory operations and kernel launches
*/
std::unique_ptr<cudf::table> merge(std::vector<table_view> const& tables_to_merge,
std::vector<cudf::size_type> const& key_cols,
std::vector<cudf::order> const& column_order,
std::vector<cudf::null_order> const& null_precedence,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

} // namespace detail
} // namespace cudf
18 changes: 16 additions & 2 deletions cpp/include/cudf/detail/quantiles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
namespace cudf {
namespace detail {

/** @copydoc cudf::quantile()
/**
* @copydoc cudf::quantile()
*
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
Expand All @@ -35,7 +36,8 @@ std::unique_ptr<column> quantile(
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @copydoc cudf::quantiles()
/**
* @copydoc cudf::quantiles()
*
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
Expand All @@ -49,5 +51,17 @@ std::unique_ptr<table> quantiles(
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::percentile_approx(column_view const&, column_view const&,
* rmm::mr::device_memory_resource*)
*
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr<column> percentile_approx(
column_view const& input,
column_view const& percentiles,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace detail
} // namespace cudf
16 changes: 14 additions & 2 deletions cpp/include/cudf/detail/sorting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace detail {
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr<column> sorted_order(
table_view input,
table_view const& input,
std::vector<order> const& column_order = {},
std::vector<null_order> const& null_precedence = {},
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
Expand All @@ -44,7 +44,7 @@ std::unique_ptr<column> sorted_order(
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr<column> stable_sorted_order(
table_view input,
table_view const& input,
std::vector<order> const& column_order = {},
std::vector<null_order> const& null_precedence = {},
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
Expand Down Expand Up @@ -90,5 +90,17 @@ std::unique_ptr<table> segmented_sort_by_key(
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::sort
*
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr<table> sort(
table_view const& values,
std::vector<order> const& column_order = {},
std::vector<null_order> const& null_precedence = {},
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace detail
} // namespace cudf
Loading

0 comments on commit ba76310

Please sign in to comment.