Skip to content

Commit

Permalink
Update groupby result_cache to allow sharing intermediate results bas…
Browse files Browse the repository at this point in the history
…ed on column_view instead of requests. (#9195)

This PR updates groupby result_cache to use `pair<column_view, aggregation>` as key to unordered_map.
This allows to cache intermediate results based on the column view. So, it is possible to cache children column_view results and can be resused in other aggregation_request.

Depends on #9185
shallow_hash and is_shallow_equivalent are used for column_view.

Additional context:
This change is required to cache children column intermediate results in #9154 and allows to be shared across multiple aggregation requests.

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Jake Hemstad (https://github.com/jrhemstad)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #9195
  • Loading branch information
karthikeyann authored Oct 5, 2021
1 parent 2f75ff3 commit 1424a2d
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 171 deletions.
32 changes: 19 additions & 13 deletions cpp/include/cudf/detail/aggregation/result_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@

#include <cudf/column/column.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/hashing.hpp>
#include <cudf/types.hpp>

#include <unordered_map>

namespace cudf {
namespace detail {
struct aggregation_equality {
bool operator()(aggregation const& lhs, aggregation const& rhs) const
struct pair_column_aggregation_equal_to {
bool operator()(std::pair<column_view, aggregation const&> const& lhs,
std::pair<column_view, aggregation const&> const& rhs) const
{
return lhs.is_equal(rhs);
return is_shallow_equivalent(lhs.first, rhs.first) and lhs.second.is_equal(rhs.second);
}
};

struct aggregation_hash {
size_t operator()(aggregation const& key) const noexcept { return key.do_hash(); }
struct pair_column_aggregation_hash {
size_t operator()(std::pair<column_view, aggregation const&> const& key) const
{
return hash_combine(shallow_hash(key.first), key.second.do_hash());
}
};

class result_cache {
Expand All @@ -43,19 +49,19 @@ class result_cache {

result_cache(size_t num_columns) : _cache(num_columns) {}

bool has_result(size_t col_idx, aggregation const& agg) const;
bool has_result(column_view const& input, aggregation const& agg) const;

void add_result(size_t col_idx, aggregation const& agg, std::unique_ptr<column>&& col);
void add_result(column_view const& input, aggregation const& agg, std::unique_ptr<column>&& col);

column_view get_result(size_t col_idx, aggregation const& agg) const;
column_view get_result(column_view const& input, aggregation const& agg) const;

std::unique_ptr<column> release_result(size_t col_idx, aggregation const& agg);
std::unique_ptr<column> release_result(column_view const& input, aggregation const& agg);

private:
std::vector<std::unordered_map<std::reference_wrapper<aggregation const>,
std::pair<std::unique_ptr<aggregation>, std::unique_ptr<column>>,
aggregation_hash,
aggregation_equality>>
std::unordered_map<std::pair<column_view, std::reference_wrapper<aggregation const>>,
std::pair<std::unique_ptr<aggregation>, std::unique_ptr<column>>,
pair_column_aggregation_hash,
pair_column_aggregation_equal_to>
_cache;
};

Expand Down
37 changes: 17 additions & 20 deletions cpp/src/aggregation/result_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,36 @@
namespace cudf {
namespace detail {

bool result_cache::has_result(size_t col_idx, aggregation const& agg) const
bool result_cache::has_result(column_view const& input, aggregation const& agg) const
{
if (col_idx > _cache.size()) return false;

auto result_it = _cache[col_idx].find(agg);

return (result_it != _cache[col_idx].end());
return _cache.count({input, agg});
}

void result_cache::add_result(size_t col_idx, aggregation const& agg, std::unique_ptr<column>&& col)
void result_cache::add_result(column_view const& input,
aggregation const& agg,
std::unique_ptr<column>&& col)
{
// We can't guarantee that agg will outlive the cache, so we need to take ownership of a copy.
// To allow lookup by reference, make the key a reference and keep the owner in the value pair.
auto owned_agg = agg.clone();
auto const& key = *owned_agg;
auto value = std::make_pair(std::move(owned_agg), std::move(col));
_cache[col_idx].emplace(key, std::move(value));
auto owned_agg = agg.clone();
auto const& key = *owned_agg;
auto value = std::make_pair(std::move(owned_agg), std::move(col));
_cache[{input, key}] = std::move(value);
}

column_view result_cache::get_result(size_t col_idx, aggregation const& agg) const
column_view result_cache::get_result(column_view const& input, aggregation const& agg) const
{
CUDF_EXPECTS(has_result(col_idx, agg), "Result does not exist in cache");

auto result_it = _cache[col_idx].find(agg);
auto result_it = _cache.find({input, agg});
CUDF_EXPECTS(result_it != _cache.end(), "Result does not exist in cache");
return result_it->second.second->view();
}

std::unique_ptr<column> result_cache::release_result(size_t col_idx, aggregation const& agg)
std::unique_ptr<column> result_cache::release_result(column_view const& input,
aggregation const& agg)
{
CUDF_EXPECTS(has_result(col_idx, agg), "Result does not exist in cache");

auto result_it = _cache[col_idx].extract(agg);
return std::move(result_it.mapped().second);
auto node = _cache.extract({input, agg});
CUDF_EXPECTS(not node.empty(), "Result does not exist in cache");
return std::move(node.mapped().second);
}

} // namespace detail
Expand Down
20 changes: 18 additions & 2 deletions cpp/src/groupby/common/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/detail/groupby.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <vector>

namespace cudf {
Expand All @@ -30,10 +32,24 @@ inline std::vector<aggregation_result> extract_results(host_span<RequestType con
cudf::detail::result_cache& cache)
{
std::vector<aggregation_result> results(requests.size());

std::unordered_map<std::pair<column_view, std::reference_wrapper<aggregation const>>,
column_view,
cudf::detail::pair_column_aggregation_hash,
cudf::detail::pair_column_aggregation_equal_to>
repeated_result;
for (size_t i = 0; i < requests.size(); i++) {
for (auto&& agg : requests[i].aggregations) {
results[i].results.emplace_back(cache.release_result(i, *agg));
if (cache.has_result(requests[i].values, *agg)) {
results[i].results.emplace_back(cache.release_result(requests[i].values, *agg));
repeated_result[{requests[i].values, *agg}] = results[i].results.back()->view();
} else {
auto it = repeated_result.find({requests[i].values, *agg});
if (it != repeated_result.end()) {
results[i].results.emplace_back(std::make_unique<column>(it->second));
} else {
CUDF_FAIL("Cannot extract result from the cache");
}
}
}
}
return results;
Expand Down
95 changes: 42 additions & 53 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ class groupby_simple_aggregations_collector final

template <typename Map>
class hash_compound_agg_finalizer final : public cudf::detail::aggregation_finalizer {
size_t col_idx;
column_view col;
data_type result_type;
cudf::detail::result_cache* sparse_results;
Expand All @@ -176,17 +175,15 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
public:
using cudf::detail::aggregation_finalizer::visit;

hash_compound_agg_finalizer(size_t col_idx,
column_view col,
hash_compound_agg_finalizer(column_view col,
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
Map const& map,
bitmask_type const* row_bitmask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: col_idx(col_idx),
col(col),
: col(col),
sparse_results(sparse_results),
dense_results(dense_results),
gather_map(gather_map),
Expand All @@ -201,8 +198,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final

auto to_dense_agg_result(cudf::aggregation const& agg)
{
auto s = sparse_results->get_result(col_idx, agg);

auto s = sparse_results->get_result(col, agg);
auto dense_result_table = cudf::detail::gather(table_view({std::move(s)}),
gather_map,
out_of_bounds_policy::DONT_CHECK,
Expand Down Expand Up @@ -239,43 +235,43 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
// Declare overloads for each kind of aggregation to dispatch
void visit(cudf::aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
dense_results->add_result(col_idx, agg, to_dense_agg_result(agg));
if (dense_results->has_result(col, agg)) return;
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}

void visit(cudf::detail::min_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;
if (result_type.id() == type_id::STRING) {
auto transformed_agg = make_argmin_aggregation();
dense_results->add_result(col_idx, agg, gather_argminmax(*transformed_agg));
dense_results->add_result(col, agg, gather_argminmax(*transformed_agg));
} else {
dense_results->add_result(col_idx, agg, to_dense_agg_result(agg));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}
}

void visit(cudf::detail::max_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;

if (result_type.id() == type_id::STRING) {
auto transformed_agg = make_argmax_aggregation();
dense_results->add_result(col_idx, agg, gather_argminmax(*transformed_agg));
dense_results->add_result(col, agg, gather_argminmax(*transformed_agg));
} else {
dense_results->add_result(col_idx, agg, to_dense_agg_result(agg));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}
}

void visit(cudf::detail::mean_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;

auto sum_agg = make_sum_aggregation();
auto count_agg = make_count_aggregation();
this->visit(*sum_agg);
this->visit(*count_agg);
column_view sum_result = dense_results->get_result(col_idx, *sum_agg);
column_view count_result = dense_results->get_result(col_idx, *count_agg);
column_view sum_result = dense_results->get_result(col, *sum_agg);
column_view count_result = dense_results->get_result(col, *count_agg);

auto result =
cudf::detail::binary_operation(sum_result,
Expand All @@ -284,19 +280,19 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
cudf::detail::target_type(result_type, aggregation::MEAN),
stream,
mr);
dense_results->add_result(col_idx, agg, std::move(result));
dense_results->add_result(col, agg, std::move(result));
}

void visit(cudf::detail::var_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;

auto sum_agg = make_sum_aggregation();
auto count_agg = make_count_aggregation();
this->visit(*sum_agg);
this->visit(*count_agg);
column_view sum_result = sparse_results->get_result(col_idx, *sum_agg);
column_view count_result = sparse_results->get_result(col_idx, *count_agg);
column_view sum_result = sparse_results->get_result(col, *sum_agg);
column_view count_result = sparse_results->get_result(col, *count_agg);

auto values_view = column_device_view::create(col);
auto sum_view = column_device_view::create(sum_result);
Expand All @@ -314,47 +310,40 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
col.size(),
::cudf::detail::var_hash_functor<Map>{
map, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof});
sparse_results->add_result(col_idx, agg, std::move(var_result));
dense_results->add_result(col_idx, agg, to_dense_agg_result(agg));
sparse_results->add_result(col, agg, std::move(var_result));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}

void visit(cudf::detail::std_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;
auto var_agg = make_variance_aggregation(agg._ddof);
this->visit(*dynamic_cast<cudf::detail::var_aggregation*>(var_agg.get()));
column_view variance = dense_results->get_result(col_idx, *var_agg);
column_view variance = dense_results->get_result(col, *var_agg);

auto result = cudf::detail::unary_operation(variance, unary_operator::SQRT, stream, mr);
dense_results->add_result(col_idx, agg, std::move(result));
dense_results->add_result(col, agg, std::move(result));
}
};
// flatten aggs to filter in single pass aggs
std::tuple<table_view,
std::vector<aggregation::Kind>,
std::vector<std::unique_ptr<aggregation>>,
std::vector<size_t>>
std::tuple<table_view, std::vector<aggregation::Kind>, std::vector<std::unique_ptr<aggregation>>>
flatten_single_pass_aggs(host_span<aggregation_request const> requests)
{
std::vector<column_view> columns;
std::vector<std::unique_ptr<aggregation>> aggs;
std::vector<aggregation::Kind> agg_kinds;
std::vector<size_t> col_ids;

for (size_t i = 0; i < requests.size(); i++) {
auto const& request = requests[i];
auto const& agg_v = request.aggregations;
for (auto const& request : requests) {
auto const& agg_v = request.aggregations;

std::unordered_set<aggregation::Kind> agg_kinds_set;
auto insert_agg =
[&](size_t i, column_view const& request_values, std::unique_ptr<aggregation>&& agg) {
if (agg_kinds_set.insert(agg->kind).second) {
agg_kinds.push_back(agg->kind);
aggs.push_back(std::move(agg));
columns.push_back(request_values);
col_ids.push_back(i);
}
};
auto insert_agg = [&](column_view const& request_values, std::unique_ptr<aggregation>&& agg) {
if (agg_kinds_set.insert(agg->kind).second) {
agg_kinds.push_back(agg->kind);
aggs.push_back(std::move(agg));
columns.push_back(request_values);
}
};

auto values_type = cudf::is_dictionary(request.values.type())
? cudf::dictionary_column_view(request.values).keys().type()
Expand All @@ -363,13 +352,12 @@ flatten_single_pass_aggs(host_span<aggregation_request const> requests)
groupby_simple_aggregations_collector collector;

for (auto& agg_s : agg->get_simple_aggregations(values_type, collector)) {
insert_agg(i, request.values, std::move(agg_s));
insert_agg(request.values, std::move(agg_s));
}
}
}

return std::make_tuple(
table_view(columns), std::move(agg_kinds), std::move(aggs), std::move(col_ids));
return std::make_tuple(table_view(columns), std::move(agg_kinds), std::move(aggs));
}

/**
Expand All @@ -396,14 +384,14 @@ void sparse_to_dense_results(table_view const& keys,
bitmask_type const* row_bitmask_ptr =
skip_key_rows_with_nulls ? static_cast<bitmask_type*>(row_bitmask.data()) : nullptr;

for (size_t i = 0; i < requests.size(); i++) {
auto const& agg_v = requests[i].aggregations;
auto const& col = requests[i].values;
for (auto const& request : requests) {
auto const& agg_v = request.aggregations;
auto const& col = request.values;

// Given an aggregation, this will get the result from sparse_results and
// convert and return dense, compacted result
auto finalizer = hash_compound_agg_finalizer<Map>(
i, col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr);
col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr);
for (auto&& agg : agg_v) {
agg->finalize(finalizer);
}
Expand Down Expand Up @@ -491,7 +479,7 @@ void compute_single_pass_aggs(table_view const& keys,
rmm::cuda_stream_view stream)
{
// flatten the aggs to a table that can be operated on by aggregate_row
auto const [flattened_values, agg_kinds, aggs, col_ids] = flatten_single_pass_aggs(requests);
auto const [flattened_values, agg_kinds, aggs] = flatten_single_pass_aggs(requests);

// make table that will hold sparse results
table sparse_table = create_sparse_results_table(flattened_values, agg_kinds, stream);
Expand Down Expand Up @@ -519,7 +507,8 @@ void compute_single_pass_aggs(table_view const& keys,
auto sparse_result_cols = sparse_table.release();
for (size_t i = 0; i < aggs.size(); i++) {
// Note that the cache will make a copy of this temporary aggregation
sparse_results->add_result(col_ids[i], *aggs[i], std::move(sparse_result_cols[i]));
sparse_results->add_result(
flattened_values.column(i), *aggs[i], std::move(sparse_result_cols[i]));
}
}

Expand Down
Loading

0 comments on commit 1424a2d

Please sign in to comment.