diff --git a/cpp/include/cudf/detail/aggregation/result_cache.hpp b/cpp/include/cudf/detail/aggregation/result_cache.hpp index ebb1ea784e5..41f5c19f06a 100644 --- a/cpp/include/cudf/detail/aggregation/result_cache.hpp +++ b/cpp/include/cudf/detail/aggregation/result_cache.hpp @@ -18,20 +18,26 @@ #include #include +#include +#include #include namespace cudf { namespace detail { -struct aggregation_equality { - bool operator()(aggregation const& lhs, aggregation const& rhs) const +struct pair_column_aggregation_equal_to { + bool operator()(std::pair const& lhs, + std::pair const& rhs) const { - return lhs.is_equal(rhs); + return is_shallow_equivalent(lhs.first, rhs.first) and lhs.second.is_equal(rhs.second); } }; -struct aggregation_hash { - size_t operator()(aggregation const& key) const noexcept { return key.do_hash(); } +struct pair_column_aggregation_hash { + size_t operator()(std::pair const& key) const + { + return hash_combine(shallow_hash(key.first), key.second.do_hash()); + } }; class result_cache { @@ -43,19 +49,19 @@ class result_cache { result_cache(size_t num_columns) : _cache(num_columns) {} - bool has_result(size_t col_idx, aggregation const& agg) const; + bool has_result(column_view const& input, aggregation const& agg) const; - void add_result(size_t col_idx, aggregation const& agg, std::unique_ptr&& col); + void add_result(column_view const& input, aggregation const& agg, std::unique_ptr&& col); - column_view get_result(size_t col_idx, aggregation const& agg) const; + column_view get_result(column_view const& input, aggregation const& agg) const; - std::unique_ptr release_result(size_t col_idx, aggregation const& agg); + std::unique_ptr release_result(column_view const& input, aggregation const& agg); private: - std::vector, - std::pair, std::unique_ptr>, - aggregation_hash, - aggregation_equality>> + std::unordered_map>, + std::pair, std::unique_ptr>, + pair_column_aggregation_hash, + pair_column_aggregation_equal_to> _cache; }; diff --git a/cpp/src/aggregation/result_cache.cpp b/cpp/src/aggregation/result_cache.cpp index 36668af5355..1889ae67ee3 100644 --- a/cpp/src/aggregation/result_cache.cpp +++ b/cpp/src/aggregation/result_cache.cpp @@ -19,39 +19,36 @@ namespace cudf { namespace detail { -bool result_cache::has_result(size_t col_idx, aggregation const& agg) const +bool result_cache::has_result(column_view const& input, aggregation const& agg) const { - if (col_idx > _cache.size()) return false; - - auto result_it = _cache[col_idx].find(agg); - - return (result_it != _cache[col_idx].end()); + return _cache.count({input, agg}); } -void result_cache::add_result(size_t col_idx, aggregation const& agg, std::unique_ptr&& col) +void result_cache::add_result(column_view const& input, + aggregation const& agg, + std::unique_ptr&& col) { // We can't guarantee that agg will outlive the cache, so we need to take ownership of a copy. // To allow lookup by reference, make the key a reference and keep the owner in the value pair. - auto owned_agg = agg.clone(); - auto const& key = *owned_agg; - auto value = std::make_pair(std::move(owned_agg), std::move(col)); - _cache[col_idx].emplace(key, std::move(value)); + auto owned_agg = agg.clone(); + auto const& key = *owned_agg; + auto value = std::make_pair(std::move(owned_agg), std::move(col)); + _cache[{input, key}] = std::move(value); } -column_view result_cache::get_result(size_t col_idx, aggregation const& agg) const +column_view result_cache::get_result(column_view const& input, aggregation const& agg) const { - CUDF_EXPECTS(has_result(col_idx, agg), "Result does not exist in cache"); - - auto result_it = _cache[col_idx].find(agg); + auto result_it = _cache.find({input, agg}); + CUDF_EXPECTS(result_it != _cache.end(), "Result does not exist in cache"); return result_it->second.second->view(); } -std::unique_ptr result_cache::release_result(size_t col_idx, aggregation const& agg) +std::unique_ptr result_cache::release_result(column_view const& input, + aggregation const& agg) { - CUDF_EXPECTS(has_result(col_idx, agg), "Result does not exist in cache"); - - auto result_it = _cache[col_idx].extract(agg); - return std::move(result_it.mapped().second); + auto node = _cache.extract({input, agg}); + CUDF_EXPECTS(not node.empty(), "Result does not exist in cache"); + return std::move(node.mapped().second); } } // namespace detail diff --git a/cpp/src/groupby/common/utils.hpp b/cpp/src/groupby/common/utils.hpp index 2804dea576e..e3611eb0e4b 100644 --- a/cpp/src/groupby/common/utils.hpp +++ b/cpp/src/groupby/common/utils.hpp @@ -19,6 +19,8 @@ #include #include #include + +#include #include namespace cudf { @@ -30,10 +32,24 @@ inline std::vector extract_results(host_span results(requests.size()); - + std::unordered_map>, + column_view, + cudf::detail::pair_column_aggregation_hash, + cudf::detail::pair_column_aggregation_equal_to> + repeated_result; for (size_t i = 0; i < requests.size(); i++) { for (auto&& agg : requests[i].aggregations) { - results[i].results.emplace_back(cache.release_result(i, *agg)); + if (cache.has_result(requests[i].values, *agg)) { + results[i].results.emplace_back(cache.release_result(requests[i].values, *agg)); + repeated_result[{requests[i].values, *agg}] = results[i].results.back()->view(); + } else { + auto it = repeated_result.find({requests[i].values, *agg}); + if (it != repeated_result.end()) { + results[i].results.emplace_back(std::make_unique(it->second)); + } else { + CUDF_FAIL("Cannot extract result from the cache"); + } + } } } return results; diff --git a/cpp/src/groupby/hash/groupby.cu b/cpp/src/groupby/hash/groupby.cu index 247580bb8ee..b8150f7fd14 100644 --- a/cpp/src/groupby/hash/groupby.cu +++ b/cpp/src/groupby/hash/groupby.cu @@ -162,7 +162,6 @@ class groupby_simple_aggregations_collector final template class hash_compound_agg_finalizer final : public cudf::detail::aggregation_finalizer { - size_t col_idx; column_view col; data_type result_type; cudf::detail::result_cache* sparse_results; @@ -176,8 +175,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final public: using cudf::detail::aggregation_finalizer::visit; - hash_compound_agg_finalizer(size_t col_idx, - column_view col, + hash_compound_agg_finalizer(column_view col, cudf::detail::result_cache* sparse_results, cudf::detail::result_cache* dense_results, device_span gather_map, @@ -185,8 +183,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final bitmask_type const* row_bitmask, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : col_idx(col_idx), - col(col), + : col(col), sparse_results(sparse_results), dense_results(dense_results), gather_map(gather_map), @@ -201,8 +198,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final auto to_dense_agg_result(cudf::aggregation const& agg) { - auto s = sparse_results->get_result(col_idx, agg); - + auto s = sparse_results->get_result(col, agg); auto dense_result_table = cudf::detail::gather(table_view({std::move(s)}), gather_map, out_of_bounds_policy::DONT_CHECK, @@ -239,43 +235,43 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final // Declare overloads for each kind of aggregation to dispatch void visit(cudf::aggregation const& agg) override { - if (dense_results->has_result(col_idx, agg)) return; - dense_results->add_result(col_idx, agg, to_dense_agg_result(agg)); + if (dense_results->has_result(col, agg)) return; + dense_results->add_result(col, agg, to_dense_agg_result(agg)); } void visit(cudf::detail::min_aggregation const& agg) override { - if (dense_results->has_result(col_idx, agg)) return; + if (dense_results->has_result(col, agg)) return; if (result_type.id() == type_id::STRING) { auto transformed_agg = make_argmin_aggregation(); - dense_results->add_result(col_idx, agg, gather_argminmax(*transformed_agg)); + dense_results->add_result(col, agg, gather_argminmax(*transformed_agg)); } else { - dense_results->add_result(col_idx, agg, to_dense_agg_result(agg)); + dense_results->add_result(col, agg, to_dense_agg_result(agg)); } } void visit(cudf::detail::max_aggregation const& agg) override { - if (dense_results->has_result(col_idx, agg)) return; + if (dense_results->has_result(col, agg)) return; if (result_type.id() == type_id::STRING) { auto transformed_agg = make_argmax_aggregation(); - dense_results->add_result(col_idx, agg, gather_argminmax(*transformed_agg)); + dense_results->add_result(col, agg, gather_argminmax(*transformed_agg)); } else { - dense_results->add_result(col_idx, agg, to_dense_agg_result(agg)); + dense_results->add_result(col, agg, to_dense_agg_result(agg)); } } void visit(cudf::detail::mean_aggregation const& agg) override { - if (dense_results->has_result(col_idx, agg)) return; + if (dense_results->has_result(col, agg)) return; auto sum_agg = make_sum_aggregation(); auto count_agg = make_count_aggregation(); this->visit(*sum_agg); this->visit(*count_agg); - column_view sum_result = dense_results->get_result(col_idx, *sum_agg); - column_view count_result = dense_results->get_result(col_idx, *count_agg); + column_view sum_result = dense_results->get_result(col, *sum_agg); + column_view count_result = dense_results->get_result(col, *count_agg); auto result = cudf::detail::binary_operation(sum_result, @@ -284,19 +280,19 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final cudf::detail::target_type(result_type, aggregation::MEAN), stream, mr); - dense_results->add_result(col_idx, agg, std::move(result)); + dense_results->add_result(col, agg, std::move(result)); } void visit(cudf::detail::var_aggregation const& agg) override { - if (dense_results->has_result(col_idx, agg)) return; + if (dense_results->has_result(col, agg)) return; auto sum_agg = make_sum_aggregation(); auto count_agg = make_count_aggregation(); this->visit(*sum_agg); this->visit(*count_agg); - column_view sum_result = sparse_results->get_result(col_idx, *sum_agg); - column_view count_result = sparse_results->get_result(col_idx, *count_agg); + column_view sum_result = sparse_results->get_result(col, *sum_agg); + column_view count_result = sparse_results->get_result(col, *count_agg); auto values_view = column_device_view::create(col); auto sum_view = column_device_view::create(sum_result); @@ -314,47 +310,40 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final col.size(), ::cudf::detail::var_hash_functor{ map, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof}); - sparse_results->add_result(col_idx, agg, std::move(var_result)); - dense_results->add_result(col_idx, agg, to_dense_agg_result(agg)); + sparse_results->add_result(col, agg, std::move(var_result)); + dense_results->add_result(col, agg, to_dense_agg_result(agg)); } void visit(cudf::detail::std_aggregation const& agg) override { - if (dense_results->has_result(col_idx, agg)) return; + if (dense_results->has_result(col, agg)) return; auto var_agg = make_variance_aggregation(agg._ddof); this->visit(*dynamic_cast(var_agg.get())); - column_view variance = dense_results->get_result(col_idx, *var_agg); + column_view variance = dense_results->get_result(col, *var_agg); auto result = cudf::detail::unary_operation(variance, unary_operator::SQRT, stream, mr); - dense_results->add_result(col_idx, agg, std::move(result)); + dense_results->add_result(col, agg, std::move(result)); } }; // flatten aggs to filter in single pass aggs -std::tuple, - std::vector>, - std::vector> +std::tuple, std::vector>> flatten_single_pass_aggs(host_span requests) { std::vector columns; std::vector> aggs; std::vector agg_kinds; - std::vector col_ids; - for (size_t i = 0; i < requests.size(); i++) { - auto const& request = requests[i]; - auto const& agg_v = request.aggregations; + for (auto const& request : requests) { + auto const& agg_v = request.aggregations; std::unordered_set agg_kinds_set; - auto insert_agg = - [&](size_t i, column_view const& request_values, std::unique_ptr&& agg) { - if (agg_kinds_set.insert(agg->kind).second) { - agg_kinds.push_back(agg->kind); - aggs.push_back(std::move(agg)); - columns.push_back(request_values); - col_ids.push_back(i); - } - }; + auto insert_agg = [&](column_view const& request_values, std::unique_ptr&& agg) { + if (agg_kinds_set.insert(agg->kind).second) { + agg_kinds.push_back(agg->kind); + aggs.push_back(std::move(agg)); + columns.push_back(request_values); + } + }; auto values_type = cudf::is_dictionary(request.values.type()) ? cudf::dictionary_column_view(request.values).keys().type() @@ -363,13 +352,12 @@ flatten_single_pass_aggs(host_span requests) groupby_simple_aggregations_collector collector; for (auto& agg_s : agg->get_simple_aggregations(values_type, collector)) { - insert_agg(i, request.values, std::move(agg_s)); + insert_agg(request.values, std::move(agg_s)); } } } - return std::make_tuple( - table_view(columns), std::move(agg_kinds), std::move(aggs), std::move(col_ids)); + return std::make_tuple(table_view(columns), std::move(agg_kinds), std::move(aggs)); } /** @@ -396,14 +384,14 @@ void sparse_to_dense_results(table_view const& keys, bitmask_type const* row_bitmask_ptr = skip_key_rows_with_nulls ? static_cast(row_bitmask.data()) : nullptr; - for (size_t i = 0; i < requests.size(); i++) { - auto const& agg_v = requests[i].aggregations; - auto const& col = requests[i].values; + for (auto const& request : requests) { + auto const& agg_v = request.aggregations; + auto const& col = request.values; // Given an aggregation, this will get the result from sparse_results and // convert and return dense, compacted result auto finalizer = hash_compound_agg_finalizer( - i, col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr); + col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr); for (auto&& agg : agg_v) { agg->finalize(finalizer); } @@ -491,7 +479,7 @@ void compute_single_pass_aggs(table_view const& keys, rmm::cuda_stream_view stream) { // flatten the aggs to a table that can be operated on by aggregate_row - auto const [flattened_values, agg_kinds, aggs, col_ids] = flatten_single_pass_aggs(requests); + auto const [flattened_values, agg_kinds, aggs] = flatten_single_pass_aggs(requests); // make table that will hold sparse results table sparse_table = create_sparse_results_table(flattened_values, agg_kinds, stream); @@ -519,7 +507,8 @@ void compute_single_pass_aggs(table_view const& keys, auto sparse_result_cols = sparse_table.release(); for (size_t i = 0; i < aggs.size(); i++) { // Note that the cache will make a copy of this temporary aggregation - sparse_results->add_result(col_ids[i], *aggs[i], std::move(sparse_result_cols[i])); + sparse_results->add_result( + flattened_values.column(i), *aggs[i], std::move(sparse_result_cols[i])); } } diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 9f3d67ac38b..1547964f3f4 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -64,10 +64,10 @@ struct aggregate_result_functor final : store_result_functor { template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; cache.add_result( - col_idx, + values, agg, get_grouped_values().nullable() ? detail::group_count_valid( @@ -79,10 +79,10 @@ void aggregate_result_functor::operator()(aggregation template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; cache.add_result( - col_idx, + values, agg, detail::group_count_all(helper.group_offsets(stream), helper.num_groups(stream), stream, mr)); } @@ -90,10 +90,10 @@ void aggregate_result_functor::operator()(aggregation co template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; cache.add_result( - col_idx, + values, agg, detail::group_sum( get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr)); @@ -102,10 +102,10 @@ void aggregate_result_functor::operator()(aggregation const& a template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; cache.add_result( - col_idx, + values, agg, detail::group_product( get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr)); @@ -114,9 +114,9 @@ void aggregate_result_functor::operator()(aggregation cons template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; - cache.add_result(col_idx, + cache.add_result(values, agg, detail::group_argmax(get_grouped_values(), helper.num_groups(stream), @@ -129,9 +129,9 @@ void aggregate_result_functor::operator()(aggregation const template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; - cache.add_result(col_idx, + cache.add_result(values, agg, detail::group_argmin(get_grouped_values(), helper.num_groups(stream), @@ -144,7 +144,7 @@ void aggregate_result_functor::operator()(aggregation const template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto result = [&]() { auto values_type = cudf::is_dictionary(values.type()) @@ -156,7 +156,7 @@ void aggregate_result_functor::operator()(aggregation const& a } else { auto argmin_agg = make_argmin_aggregation(); operator()(*argmin_agg); - column_view argmin_result = cache.get_result(col_idx, *argmin_agg); + column_view argmin_result = cache.get_result(values, *argmin_agg); // We make a view of ARGMIN result without a null mask and gather using // this mask. The values in data buffer of ARGMIN result corresponding @@ -178,13 +178,13 @@ void aggregate_result_functor::operator()(aggregation const& a } }(); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto result = [&]() { auto values_type = cudf::is_dictionary(values.type()) @@ -196,7 +196,7 @@ void aggregate_result_functor::operator()(aggregation const& a } else { auto argmax_agg = make_argmax_aggregation(); operator()(*argmax_agg); - column_view argmax_result = cache.get_result(col_idx, *argmax_agg); + column_view argmax_result = cache.get_result(values, *argmax_agg); // We make a view of ARGMAX result without a null mask and gather using // this mask. The values in data buffer of ARGMAX result corresponding @@ -218,20 +218,20 @@ void aggregate_result_functor::operator()(aggregation const& a } }(); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto sum_agg = make_sum_aggregation(); auto count_agg = make_count_aggregation(); operator()(*sum_agg); operator()(*count_agg); - column_view sum_result = cache.get_result(col_idx, *sum_agg); - column_view count_result = cache.get_result(col_idx, *count_agg); + column_view sum_result = cache.get_result(values, *sum_agg); + column_view count_result = cache.get_result(values, *count_agg); // TODO (dm): Special case for timestamp. Add target_type_impl for it. // Blocked until we support operator+ on timestamps @@ -242,20 +242,20 @@ void aggregate_result_functor::operator()(aggregation const& cudf::detail::target_type(values.type(), aggregation::MEAN), stream, mr); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto const mean_agg = make_mean_aggregation(); operator()(*mean_agg); - auto const mean_result = cache.get_result(col_idx, *mean_agg); + auto const mean_result = cache.get_result(values, *mean_agg); cache.add_result( - col_idx, + values, agg, detail::group_m2(get_grouped_values(), mean_result, helper.group_labels(stream), stream, mr)); }; @@ -263,15 +263,15 @@ void aggregate_result_functor::operator()(aggregation const& ag template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto& var_agg = dynamic_cast(agg); auto mean_agg = make_mean_aggregation(); auto count_agg = make_count_aggregation(); operator()(*mean_agg); operator()(*count_agg); - column_view mean_result = cache.get_result(col_idx, *mean_agg); - column_view group_sizes = cache.get_result(col_idx, *count_agg); + column_view mean_result = cache.get_result(values, *mean_agg); + column_view group_sizes = cache.get_result(values, *count_agg); auto result = detail::group_var(get_grouped_values(), mean_result, @@ -280,31 +280,31 @@ void aggregate_result_functor::operator()(aggregation con var_agg._ddof, stream, mr); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto& std_agg = dynamic_cast(agg); auto var_agg = make_variance_aggregation(std_agg._ddof); operator()(*var_agg); - column_view var_result = cache.get_result(col_idx, *var_agg); + column_view var_result = cache.get_result(values, *var_agg); auto result = cudf::detail::unary_operation(var_result, unary_operator::SQRT, stream, mr); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto count_agg = make_count_aggregation(); operator()(*count_agg); - column_view group_sizes = cache.get_result(col_idx, *count_agg); + column_view group_sizes = cache.get_result(values, *count_agg); auto& quantile_agg = dynamic_cast(agg); auto result = detail::group_quantiles(get_sorted_values(), @@ -315,17 +315,17 @@ void aggregate_result_functor::operator()(aggregation con quantile_agg._interpolation, stream, mr); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto count_agg = make_count_aggregation(); operator()(*count_agg); - column_view group_sizes = cache.get_result(col_idx, *count_agg); + column_view group_sizes = cache.get_result(values, *count_agg); auto result = detail::group_quantiles(get_sorted_values(), group_sizes, @@ -335,13 +335,13 @@ void aggregate_result_functor::operator()(aggregation const interpolation::LINEAR, stream, mr); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto& nunique_agg = dynamic_cast(agg); @@ -352,13 +352,13 @@ void aggregate_result_functor::operator()(aggregation cons nunique_agg._null_handling, stream, mr); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; auto& nth_element_agg = dynamic_cast(agg); @@ -370,9 +370,9 @@ void aggregate_result_functor::operator()(aggregation } else { CUDF_FAIL("Wrong count aggregation kind"); } - column_view group_sizes = cache.get_result(col_idx, *count_agg); + column_view group_sizes = cache.get_result(values, *count_agg); - cache.add_result(col_idx, + cache.add_result(values, agg, detail::group_nth_element(get_grouped_values(), group_sizes, @@ -388,7 +388,7 @@ void aggregate_result_functor::operator()(aggregation template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) { return; } + if (cache.has_result(values, agg)) { return; } auto const null_handling = dynamic_cast(agg)._null_handling; @@ -398,13 +398,13 @@ void aggregate_result_functor::operator()(aggregation null_handling, stream, mr); - cache.add_result(col_idx, agg, std::move(result)); + cache.add_result(values, agg, std::move(result)); }; template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) { return; } + if (cache.has_result(values, agg)) { return; } auto const null_handling = dynamic_cast(agg)._null_handling; @@ -419,7 +419,7 @@ void aggregate_result_functor::operator()(aggregation auto const nans_equal = dynamic_cast(agg)._nans_equal; cache.add_result( - col_idx, + values, agg, lists::detail::drop_list_duplicates( lists_column_view(collect_result->view()), nulls_equal, nans_equal, stream, mr)); @@ -443,10 +443,10 @@ void aggregate_result_functor::operator()(aggregation template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) { return; } + if (cache.has_result(values, agg)) { return; } cache.add_result( - col_idx, + values, agg, detail::group_merge_lists( get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr)); @@ -479,7 +479,7 @@ void aggregate_result_functor::operator()(aggregation template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) { return; } + if (cache.has_result(values, agg)) { return; } auto const merged_result = detail::group_merge_lists(get_grouped_values(), helper.group_offsets(stream), @@ -487,7 +487,7 @@ void aggregate_result_functor::operator()(aggregation c stream, rmm::mr::get_current_device_resource()); auto const& merge_sets_agg = dynamic_cast(agg); - cache.add_result(col_idx, + cache.add_result(values, agg, lists::detail::drop_list_duplicates(lists_column_view(merged_result->view()), merge_sets_agg._nulls_equal, @@ -516,10 +516,10 @@ void aggregate_result_functor::operator()(aggregation c template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) { return; } + if (cache.has_result(values, agg)) { return; } cache.add_result( - col_idx, + values, agg, detail::group_merge_m2( get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr)); @@ -552,16 +552,16 @@ void aggregate_result_functor::operator()(aggregation con template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) { return; } + if (cache.has_result(values, agg)) { return; } auto const max_centroids = dynamic_cast(agg).max_centroids; auto count_agg = make_count_aggregation(); operator()(*count_agg); - column_view valid_counts = cache.get_result(col_idx, *count_agg); + column_view valid_counts = cache.get_result(values, *count_agg); - cache.add_result(col_idx, + cache.add_result(values, agg, detail::group_tdigest( get_sorted_values(), @@ -601,11 +601,11 @@ void aggregate_result_functor::operator()(aggregation cons template <> void aggregate_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) { return; } + if (cache.has_result(values, agg)) { return; } auto const max_centroids = dynamic_cast(agg).max_centroids; - cache.add_result(col_idx, + cache.add_result(values, agg, detail::group_merge_tdigest(get_grouped_values(), helper.group_offsets(stream), @@ -629,13 +629,12 @@ std::pair, std::vector> groupby::sort // sum and count. std depends on mean and count cudf::detail::result_cache cache(requests.size()); - for (size_t i = 0; i < requests.size(); i++) { + for (auto const& request : requests) { auto store_functor = - detail::aggregate_result_functor(i, requests[i].values, helper(), cache, stream, mr); - for (size_t j = 0; j < requests[i].aggregations.size(); j++) { + detail::aggregate_result_functor(request.values, helper(), cache, stream, mr); + for (auto const& agg : request.aggregations) { // TODO (dm): single pass compute all supported reductions - cudf::detail::aggregation_dispatcher( - requests[i].aggregations[j]->kind, store_functor, *requests[i].aggregations[j]); + cudf::detail::aggregation_dispatcher(agg->kind, store_functor, *agg); } } diff --git a/cpp/src/groupby/sort/functors.hpp b/cpp/src/groupby/sort/functors.hpp index afb92f8e141..cbe5f08639a 100644 --- a/cpp/src/groupby/sort/functors.hpp +++ b/cpp/src/groupby/sort/functors.hpp @@ -36,13 +36,12 @@ namespace detail { * of these values. */ struct store_result_functor { - store_result_functor(size_type col_idx, - column_view const& values, + store_result_functor(column_view const& values, sort::sort_groupby_helper& helper, cudf::detail::result_cache& cache, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : col_idx(col_idx), helper(helper), cache(cache), values(values), stream(stream), mr(mr) + : helper(helper), cache(cache), values(values), stream(stream), mr(mr) { } @@ -80,7 +79,6 @@ struct store_result_functor { }; protected: - size_type col_idx; ///< Index of column in requests being operated on sort::sort_groupby_helper& helper; ///< Sort helper cudf::detail::result_cache& cache; ///< cache of results to store into column_view const& values; ///< Column of values to group and aggregate diff --git a/cpp/src/groupby/sort/scan.cpp b/cpp/src/groupby/sort/scan.cpp index dade6881bbd..ace5d0e539c 100644 --- a/cpp/src/groupby/sort/scan.cpp +++ b/cpp/src/groupby/sort/scan.cpp @@ -68,10 +68,10 @@ struct scan_result_functor final : store_result_functor { template <> void scan_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; cache.add_result( - col_idx, + values, agg, detail::sum_scan( get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr)); @@ -80,10 +80,10 @@ void scan_result_functor::operator()(aggregation const& agg) template <> void scan_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; cache.add_result( - col_idx, + values, agg, detail::min_scan( get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr)); @@ -92,10 +92,10 @@ void scan_result_functor::operator()(aggregation const& agg) template <> void scan_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; cache.add_result( - col_idx, + values, agg, detail::max_scan( get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr)); @@ -104,15 +104,15 @@ void scan_result_functor::operator()(aggregation const& agg) template <> void scan_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; - cache.add_result(col_idx, agg, detail::count_scan(helper.group_labels(stream), stream, mr)); + cache.add_result(values, agg, detail::count_scan(helper.group_labels(stream), stream, mr)); } template <> void scan_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; CUDF_EXPECTS(helper.is_presorted(), "Rank aggregate in groupby scan requires the keys to be presorted"); auto const order_by = get_grouped_values(); @@ -120,7 +120,7 @@ void scan_result_functor::operator()(aggregation const& agg) "Unsupported list type in grouped rank scan."); cache.add_result( - col_idx, + values, agg, detail::rank_scan( order_by, helper.group_labels(stream), helper.group_offsets(stream), stream, mr)); @@ -129,7 +129,7 @@ void scan_result_functor::operator()(aggregation const& agg) template <> void scan_result_functor::operator()(aggregation const& agg) { - if (cache.has_result(col_idx, agg)) return; + if (cache.has_result(values, agg)) return; CUDF_EXPECTS(helper.is_presorted(), "Dense rank aggregate in groupby scan requires the keys to be presorted"); auto const order_by = get_grouped_values(); @@ -137,7 +137,7 @@ void scan_result_functor::operator()(aggregation const& "Unsupported list type in grouped dense_rank scan."); cache.add_result( - col_idx, + values, agg, detail::dense_rank_scan( order_by, helper.group_labels(stream), helper.group_offsets(stream), stream, mr)); @@ -155,10 +155,9 @@ std::pair, std::vector> groupby::sort // sum and count. std depends on mean and count cudf::detail::result_cache cache(requests.size()); - for (size_t i = 0; i < requests.size(); i++) { - auto store_functor = - detail::scan_result_functor(i, requests[i].values, helper(), cache, stream, mr); - for (auto const& aggregation : requests[i].aggregations) { + for (auto const& request : requests) { + auto store_functor = detail::scan_result_functor(request.values, helper(), cache, stream, mr); + for (auto const& aggregation : request.aggregations) { // TODO (dm): single pass compute all supported reductions cudf::detail::aggregation_dispatcher(aggregation->kind, store_functor, *aggregation); }