Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update groupby result_cache to allow sharing intermediate results based on column_view instead of requests. #9195

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d421d6d
add shallow_hash(column_view)
karthikeyann Sep 7, 2021
9c4a9f3
add CompoundTypes to type_lists
karthikeyann Sep 7, 2021
a3dd235
add shallow_hash tests
karthikeyann Sep 7, 2021
2365d07
add column copy test
karthikeyann Sep 7, 2021
88726a4
add shallow_equal(column_view) and tests
karthikeyann Sep 7, 2021
d52509d
update result_cache to use shallow_hash, shallow_equal
karthikeyann Sep 8, 2021
d9a8bd7
Update cpp/include/cudf/column/column_view.hpp
karthikeyann Sep 8, 2021
7e7f250
ignore data, nullmask, offset if parent size is empty
karthikeyann Sep 13, 2021
d1d5c3c
Merge branch 'fea-shallow_hash_columnview' of github.com:karthikeyann…
karthikeyann Sep 13, 2021
0005154
is_shallow_equal ignore children states for empty column. (not childr…
karthikeyann Sep 13, 2021
e692053
for empty column, ignore child pointers in shallow_hash
karthikeyann Sep 14, 2021
44372bc
rename is_shallow_equal to is_shallow_equivalent
karthikeyann Sep 14, 2021
ecc3a7d
use hash_combine for shallow hash
karthikeyann Sep 16, 2021
d2cd468
Apply suggestions from code review (jake)
karthikeyann Sep 16, 2021
fa40847
address review comments
karthikeyann Sep 17, 2021
f709b2a
Merge branch 'fea-shallow_hash_columnview' of github.com:karthikeyann…
karthikeyann Sep 17, 2021
6ac5725
update after PR #9185 updates
karthikeyann Sep 17, 2021
e863bc7
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into enh-grou…
karthikeyann Sep 17, 2021
f66fdd9
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into fea-shal…
karthikeyann Sep 18, 2021
e36b834
add boost license for hash_combine, move to diff header
karthikeyann Sep 18, 2021
a1ff894
Merge branch 'fea-shallow_hash_columnview' of github.com:karthikeyann…
karthikeyann Sep 18, 2021
1fbe3fc
Apply suggestions from code review (jake)
karthikeyann Sep 18, 2021
79ca5e5
Merge branches 'enh-groupby_cache_hashed' and 'fea-shallow_hash_colum…
karthikeyann Sep 18, 2021
fc3cc6b
include cleanup
karthikeyann Sep 18, 2021
eb2b0db
Merge branch 'fea-shallow_hash_columnview' of github.com:karthikeyann…
karthikeyann Sep 18, 2021
0593955
Merge branch 'fea-shallow_hash_columnview' of github.com:karthikeyann…
karthikeyann Sep 18, 2021
f7b6bb6
add missing include due to reorg
karthikeyann Sep 18, 2021
9f19ddf
Apply suggestions from code review (jake)
karthikeyann Sep 20, 2021
98bbc94
fix duplicate {col, agg} request extract
karthikeyann Sep 20, 2021
9581525
address review comments
karthikeyann Sep 20, 2021
dcb0668
Merge branch 'fea-shallow_hash_columnview' of github.com:karthikeyann…
karthikeyann Sep 20, 2021
243490b
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into enh-grou…
karthikeyann Sep 20, 2021
1a5f367
Update cpp/src/column/column_view.cpp
karthikeyann Sep 21, 2021
8fa765c
Merge branch 'fea-shallow_hash_columnview' of github.com:karthikeyann…
karthikeyann Sep 22, 2021
e10ca8c
Merge branch 'branch-21.12' of github.com:rapidsai/cudf into enh-grou…
karthikeyann Sep 24, 2021
68c4213
merge fix
karthikeyann Sep 24, 2021
3abeb1c
Merge branch 'rapidsai:branch-21.12' into enh-groupby_cache_hashed
karthikeyann Sep 27, 2021
ece8279
Update cpp/include/cudf/detail/aggregation/result_cache.hpp (vyasr)
karthikeyann Oct 4, 2021
7d7eda5
Merge branch 'branch-21.12' into enh-groupby_cache_hashed
karthikeyann Oct 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions cpp/include/cudf/detail/aggregation/result_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@

#include <cudf/column/column.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/hashing.hpp>
#include <cudf/types.hpp>

#include <unordered_map>

namespace cudf {
namespace detail {
struct aggregation_equality {
bool operator()(aggregation const& lhs, aggregation const& rhs) const
struct pair_column_aggregation_equal_to {
bool operator()(std::pair<column_view, aggregation const&> const& lhs,
std::pair<column_view, aggregation const&> const& rhs) const
{
return lhs.is_equal(rhs);
return is_shallow_equivalent(lhs.first, rhs.first) and lhs.second.is_equal(rhs.second);
}
};

struct aggregation_hash {
size_t operator()(aggregation const& key) const noexcept { return key.do_hash(); }
struct pair_column_aggregation_hash {
size_t operator()(std::pair<column_view, aggregation const&> const& key) const
{
return hash_combine(shallow_hash(key.first), key.second.do_hash());
}
};

class result_cache {
Expand All @@ -43,19 +49,19 @@ class result_cache {

result_cache(size_t num_columns) : _cache(num_columns) {}

bool has_result(size_t col_idx, aggregation const& agg) const;
bool has_result(column_view const& input, aggregation const& agg) const;

void add_result(size_t col_idx, aggregation const& agg, std::unique_ptr<column>&& col);
void add_result(column_view const& input, aggregation const& agg, std::unique_ptr<column>&& col);

column_view get_result(size_t col_idx, aggregation const& agg) const;
column_view get_result(column_view const& input, aggregation const& agg) const;

std::unique_ptr<column> release_result(size_t col_idx, aggregation const& agg);
std::unique_ptr<column> release_result(column_view const& input, aggregation const& agg);

private:
std::vector<std::unordered_map<std::reference_wrapper<aggregation const>,
std::pair<std::unique_ptr<aggregation>, std::unique_ptr<column>>,
aggregation_hash,
aggregation_equality>>
std::unordered_map<std::pair<column_view, std::reference_wrapper<aggregation const>>,
std::pair<std::unique_ptr<aggregation>, std::unique_ptr<column>>,
pair_column_aggregation_hash,
pair_column_aggregation_equal_to>
_cache;
};

Expand Down
37 changes: 17 additions & 20 deletions cpp/src/aggregation/result_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,36 @@
namespace cudf {
namespace detail {

bool result_cache::has_result(size_t col_idx, aggregation const& agg) const
bool result_cache::has_result(column_view const& input, aggregation const& agg) const
{
if (col_idx > _cache.size()) return false;

auto result_it = _cache[col_idx].find(agg);

return (result_it != _cache[col_idx].end());
return _cache.count({input, agg});
}

void result_cache::add_result(size_t col_idx, aggregation const& agg, std::unique_ptr<column>&& col)
void result_cache::add_result(column_view const& input,
aggregation const& agg,
std::unique_ptr<column>&& col)
{
// We can't guarantee that agg will outlive the cache, so we need to take ownership of a copy.
// To allow lookup by reference, make the key a reference and keep the owner in the value pair.
auto owned_agg = agg.clone();
auto const& key = *owned_agg;
auto value = std::make_pair(std::move(owned_agg), std::move(col));
_cache[col_idx].emplace(key, std::move(value));
auto owned_agg = agg.clone();
auto const& key = *owned_agg;
auto value = std::make_pair(std::move(owned_agg), std::move(col));
_cache[{input, key}] = std::move(value);
}

column_view result_cache::get_result(size_t col_idx, aggregation const& agg) const
column_view result_cache::get_result(column_view const& input, aggregation const& agg) const
{
CUDF_EXPECTS(has_result(col_idx, agg), "Result does not exist in cache");

auto result_it = _cache[col_idx].find(agg);
auto result_it = _cache.find({input, agg});
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
CUDF_EXPECTS(result_it != _cache.end(), "Result does not exist in cache");
return result_it->second.second->view();
}

std::unique_ptr<column> result_cache::release_result(size_t col_idx, aggregation const& agg)
std::unique_ptr<column> result_cache::release_result(column_view const& input,
aggregation const& agg)
{
CUDF_EXPECTS(has_result(col_idx, agg), "Result does not exist in cache");

auto result_it = _cache[col_idx].extract(agg);
return std::move(result_it.mapped().second);
auto node = _cache.extract({input, agg});
CUDF_EXPECTS(not node.empty(), "Result does not exist in cache");
return std::move(node.mapped().second);
}

} // namespace detail
Expand Down
20 changes: 18 additions & 2 deletions cpp/src/groupby/common/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/detail/groupby.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <vector>

namespace cudf {
Expand All @@ -30,10 +32,24 @@ inline std::vector<aggregation_result> extract_results(host_span<RequestType con
cudf::detail::result_cache& cache)
{
std::vector<aggregation_result> results(requests.size());

std::unordered_map<std::pair<column_view, std::reference_wrapper<aggregation const>>,
column_view,
cudf::detail::pair_column_aggregation_hash,
cudf::detail::pair_column_aggregation_equal_to>
repeated_result;
for (size_t i = 0; i < requests.size(); i++) {
for (auto&& agg : requests[i].aggregations) {
results[i].results.emplace_back(cache.release_result(i, *agg));
if (cache.has_result(requests[i].values, *agg)) {
results[i].results.emplace_back(cache.release_result(requests[i].values, *agg));
repeated_result[{requests[i].values, *agg}] = results[i].results.back()->view();
} else {
auto it = repeated_result.find({requests[i].values, *agg});
if (it != repeated_result.end()) {
results[i].results.emplace_back(std::make_unique<column>(it->second));
} else {
CUDF_FAIL("Cannot extract result from the cache");
}
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return results;
Expand Down
95 changes: 42 additions & 53 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ class groupby_simple_aggregations_collector final

template <typename Map>
class hash_compound_agg_finalizer final : public cudf::detail::aggregation_finalizer {
size_t col_idx;
column_view col;
data_type result_type;
cudf::detail::result_cache* sparse_results;
Expand All @@ -176,17 +175,15 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
public:
using cudf::detail::aggregation_finalizer::visit;

hash_compound_agg_finalizer(size_t col_idx,
column_view col,
hash_compound_agg_finalizer(column_view col,
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
Map const& map,
bitmask_type const* row_bitmask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: col_idx(col_idx),
col(col),
: col(col),
sparse_results(sparse_results),
dense_results(dense_results),
gather_map(gather_map),
Expand All @@ -201,8 +198,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final

auto to_dense_agg_result(cudf::aggregation const& agg)
{
auto s = sparse_results->get_result(col_idx, agg);

auto s = sparse_results->get_result(col, agg);
auto dense_result_table = cudf::detail::gather(table_view({std::move(s)}),
gather_map,
out_of_bounds_policy::DONT_CHECK,
Expand Down Expand Up @@ -239,43 +235,43 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
// Declare overloads for each kind of aggregation to dispatch
void visit(cudf::aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
dense_results->add_result(col_idx, agg, to_dense_agg_result(agg));
if (dense_results->has_result(col, agg)) return;
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}

void visit(cudf::detail::min_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;
if (result_type.id() == type_id::STRING) {
auto transformed_agg = make_argmin_aggregation();
dense_results->add_result(col_idx, agg, gather_argminmax(*transformed_agg));
dense_results->add_result(col, agg, gather_argminmax(*transformed_agg));
} else {
dense_results->add_result(col_idx, agg, to_dense_agg_result(agg));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}
}

void visit(cudf::detail::max_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;

if (result_type.id() == type_id::STRING) {
auto transformed_agg = make_argmax_aggregation();
dense_results->add_result(col_idx, agg, gather_argminmax(*transformed_agg));
dense_results->add_result(col, agg, gather_argminmax(*transformed_agg));
} else {
dense_results->add_result(col_idx, agg, to_dense_agg_result(agg));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}
}

void visit(cudf::detail::mean_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;

auto sum_agg = make_sum_aggregation();
auto count_agg = make_count_aggregation();
this->visit(*sum_agg);
this->visit(*count_agg);
column_view sum_result = dense_results->get_result(col_idx, *sum_agg);
column_view count_result = dense_results->get_result(col_idx, *count_agg);
column_view sum_result = dense_results->get_result(col, *sum_agg);
column_view count_result = dense_results->get_result(col, *count_agg);

auto result =
cudf::detail::binary_operation(sum_result,
Expand All @@ -284,19 +280,19 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
cudf::detail::target_type(result_type, aggregation::MEAN),
stream,
mr);
dense_results->add_result(col_idx, agg, std::move(result));
dense_results->add_result(col, agg, std::move(result));
}

void visit(cudf::detail::var_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;

auto sum_agg = make_sum_aggregation();
auto count_agg = make_count_aggregation();
this->visit(*sum_agg);
this->visit(*count_agg);
column_view sum_result = sparse_results->get_result(col_idx, *sum_agg);
column_view count_result = sparse_results->get_result(col_idx, *count_agg);
column_view sum_result = sparse_results->get_result(col, *sum_agg);
column_view count_result = sparse_results->get_result(col, *count_agg);

auto values_view = column_device_view::create(col);
auto sum_view = column_device_view::create(sum_result);
Expand All @@ -314,47 +310,40 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
col.size(),
::cudf::detail::var_hash_functor<Map>{
map, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof});
sparse_results->add_result(col_idx, agg, std::move(var_result));
dense_results->add_result(col_idx, agg, to_dense_agg_result(agg));
sparse_results->add_result(col, agg, std::move(var_result));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}

void visit(cudf::detail::std_aggregation const& agg) override
{
if (dense_results->has_result(col_idx, agg)) return;
if (dense_results->has_result(col, agg)) return;
auto var_agg = make_variance_aggregation(agg._ddof);
this->visit(*dynamic_cast<cudf::detail::var_aggregation*>(var_agg.get()));
column_view variance = dense_results->get_result(col_idx, *var_agg);
column_view variance = dense_results->get_result(col, *var_agg);

auto result = cudf::detail::unary_operation(variance, unary_operator::SQRT, stream, mr);
dense_results->add_result(col_idx, agg, std::move(result));
dense_results->add_result(col, agg, std::move(result));
}
};
// flatten aggs to filter in single pass aggs
std::tuple<table_view,
std::vector<aggregation::Kind>,
std::vector<std::unique_ptr<aggregation>>,
std::vector<size_t>>
std::tuple<table_view, std::vector<aggregation::Kind>, std::vector<std::unique_ptr<aggregation>>>
flatten_single_pass_aggs(host_span<aggregation_request const> requests)
{
std::vector<column_view> columns;
std::vector<std::unique_ptr<aggregation>> aggs;
std::vector<aggregation::Kind> agg_kinds;
std::vector<size_t> col_ids;

for (size_t i = 0; i < requests.size(); i++) {
auto const& request = requests[i];
auto const& agg_v = request.aggregations;
for (auto const& request : requests) {
auto const& agg_v = request.aggregations;

std::unordered_set<aggregation::Kind> agg_kinds_set;
auto insert_agg =
[&](size_t i, column_view const& request_values, std::unique_ptr<aggregation>&& agg) {
if (agg_kinds_set.insert(agg->kind).second) {
agg_kinds.push_back(agg->kind);
aggs.push_back(std::move(agg));
columns.push_back(request_values);
col_ids.push_back(i);
}
};
auto insert_agg = [&](column_view const& request_values, std::unique_ptr<aggregation>&& agg) {
if (agg_kinds_set.insert(agg->kind).second) {
agg_kinds.push_back(agg->kind);
aggs.push_back(std::move(agg));
columns.push_back(request_values);
}
};

auto values_type = cudf::is_dictionary(request.values.type())
? cudf::dictionary_column_view(request.values).keys().type()
Expand All @@ -363,13 +352,12 @@ flatten_single_pass_aggs(host_span<aggregation_request const> requests)
groupby_simple_aggregations_collector collector;

for (auto& agg_s : agg->get_simple_aggregations(values_type, collector)) {
insert_agg(i, request.values, std::move(agg_s));
insert_agg(request.values, std::move(agg_s));
}
}
}

return std::make_tuple(
table_view(columns), std::move(agg_kinds), std::move(aggs), std::move(col_ids));
return std::make_tuple(table_view(columns), std::move(agg_kinds), std::move(aggs));
}

/**
Expand All @@ -396,14 +384,14 @@ void sparse_to_dense_results(table_view const& keys,
bitmask_type const* row_bitmask_ptr =
skip_key_rows_with_nulls ? static_cast<bitmask_type*>(row_bitmask.data()) : nullptr;

for (size_t i = 0; i < requests.size(); i++) {
auto const& agg_v = requests[i].aggregations;
auto const& col = requests[i].values;
for (auto const& request : requests) {
auto const& agg_v = request.aggregations;
auto const& col = request.values;

// Given an aggregation, this will get the result from sparse_results and
// convert and return dense, compacted result
auto finalizer = hash_compound_agg_finalizer<Map>(
i, col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr);
col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr);
for (auto&& agg : agg_v) {
agg->finalize(finalizer);
}
Expand Down Expand Up @@ -491,7 +479,7 @@ void compute_single_pass_aggs(table_view const& keys,
rmm::cuda_stream_view stream)
{
// flatten the aggs to a table that can be operated on by aggregate_row
auto const [flattened_values, agg_kinds, aggs, col_ids] = flatten_single_pass_aggs(requests);
auto const [flattened_values, agg_kinds, aggs] = flatten_single_pass_aggs(requests);

// make table that will hold sparse results
table sparse_table = create_sparse_results_table(flattened_values, agg_kinds, stream);
Expand Down Expand Up @@ -519,7 +507,8 @@ void compute_single_pass_aggs(table_view const& keys,
auto sparse_result_cols = sparse_table.release();
for (size_t i = 0; i < aggs.size(); i++) {
// Note that the cache will make a copy of this temporary aggregation
sparse_results->add_result(col_ids[i], *aggs[i], std::move(sparse_result_cols[i]));
sparse_results->add_result(
flattened_values.column(i), *aggs[i], std::move(sparse_result_cols[i]));
}
}

Expand Down
Loading