Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update groupby::hash to use new row operators for keys #10770

Merged
merged 33 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d335153
Use new row hasher and comparator
PointKernel May 2, 2022
b227ec4
Get rid of Map template
PointKernel May 2, 2022
3deb64b
Fix a bug: update the lifecycle of preprocessed table
PointKernel May 5, 2022
1c7d9f4
Merge remote-tracking branch 'upstream/branch-22.06' into groupby-new…
PointKernel May 5, 2022
bf94a8d
Get rid of flattened columns
PointKernel May 6, 2022
80d8f87
Fix a bug: keys always have nulls
PointKernel May 9, 2022
5f704ec
Pass shared_ptr of preprocessed table by value
PointKernel May 9, 2022
6de7c0b
Add structs argmax unit tests
PointKernel May 10, 2022
70d740f
Add basic list tests
PointKernel May 10, 2022
1a70016
Add all null input tests
PointKernel May 10, 2022
965eba4
Add lists with nulls tests
PointKernel May 11, 2022
762bf69
Fix a lifetime bug for row operators
PointKernel May 11, 2022
a000e65
Fix a bug: check nested nulls when initing row operators
PointKernel May 12, 2022
208f224
Add const + comments
PointKernel May 12, 2022
7232a20
Merge remote-tracking branch 'upstream/branch-22.06' into groupby-new…
PointKernel May 12, 2022
b6346e5
Consistently use has_nested_nulls
PointKernel May 12, 2022
2f70d8f
Use auto const consistently
PointKernel May 17, 2022
4e6de36
Remove unused parameter
PointKernel May 17, 2022
935ccf6
Move test to proper file
PointKernel May 17, 2022
bd27723
Minor cleanups
PointKernel May 18, 2022
d4724be
Add group struct keys benchmark
PointKernel May 19, 2022
9aa2f8d
Remove unnecessary sync
PointKernel May 19, 2022
055c31a
Remove unused parameter
PointKernel May 20, 2022
abdb431
Update unit test
PointKernel May 20, 2022
70ca9a0
Improvement: use flattened keys to compute row bitmask
PointKernel May 20, 2022
ece4321
Add tests for lists with null elements
PointKernel May 24, 2022
55902dd
Minor cleanups
PointKernel May 24, 2022
9086f33
Add exception to null exclude case
PointKernel May 24, 2022
6f170d8
Revert changes to match pandas dropna behavior
PointKernel May 24, 2022
c8b1aab
Update unit tests to exercise null elements in list keys
PointKernel May 24, 2022
002ad40
Minor cleanup
PointKernel May 24, 2022
b76677c
Remove unused header
PointKernel May 24, 2022
efd497e
Throw when null structs are excluded
PointKernel May 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions cpp/src/groupby/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::disp
// satisfied with a hash implementation
if (_keys_are_sorted == sorted::NO and not _helper and
detail::hash::can_use_hash_groupby(_keys, requests)) {
// Optionally flatten nested key columns.
auto flattened = flatten_nested_columns(_keys, {}, {}, column_nullability::FORCE);
auto flattened_keys = flattened.flattened_columns();
auto is_supported_key_type = [](auto col) { return cudf::is_equality_comparable(col.type()); };
CUDF_EXPECTS(std::all_of(flattened_keys.begin(), flattened_keys.end(), is_supported_key_type),
"Unsupported groupby key type does not support equality comparison");
auto [grouped_keys, results] =
detail::hash::groupby(flattened_keys, requests, _include_null_keys, stream, mr);
return std::pair(unflatten_nested_columns(std::move(grouped_keys), _keys), std::move(results));
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
return detail::hash::groupby(_keys, requests, _include_null_keys, stream, mr);
} else {
return sort_aggregate(requests, stream, mr);
}
Expand Down
84 changes: 42 additions & 42 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/groupby.hpp>
#include <cudf/scalar/scalar.hpp>
#include <cudf/table/row_operators.cuh>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table.hpp>
#include <cudf/table/table_device_view.cuh>
#include <cudf/table/table_view.hpp>
Expand Down Expand Up @@ -65,6 +65,13 @@ namespace detail {
namespace hash {
namespace {

using map_type = concurrent_unordered_map<
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
cudf::size_type,
cudf::size_type,
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
cudf::experimental::row::hash::device_row_hasher<cudf::detail::default_hash,
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
cudf::nullate::DYNAMIC>,
cudf::experimental::row::equality::device_row_comparator<cudf::nullate::DYNAMIC>>;

/**
* @brief List of aggregation operations that can be computed with a hash-based
* implementation.
Expand Down Expand Up @@ -179,14 +186,13 @@ class groupby_simple_aggregations_collector final
}
};

template <typename Map>
class hash_compound_agg_finalizer final : public cudf::detail::aggregation_finalizer {
column_view col;
data_type result_type;
cudf::detail::result_cache* sparse_results;
cudf::detail::result_cache* dense_results;
device_span<size_type const> gather_map;
Map const& map;
map_type const& map;
bitmask_type const* __restrict__ row_bitmask;
rmm::cuda_stream_view stream;
rmm::mr::device_memory_resource* mr;
Expand All @@ -198,7 +204,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
Map const& map,
map_type const& map,
bitmask_type const* row_bitmask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
Expand Down Expand Up @@ -327,7 +333,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
col.size(),
::cudf::detail::var_hash_functor<Map>{
::cudf::detail::var_hash_functor<map_type>{
map, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof});
sparse_results->add_result(col, agg, std::move(var_result));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
Expand Down Expand Up @@ -385,14 +391,12 @@ flatten_single_pass_aggs(host_span<aggregation_request const> requests)
*
* @see groupby_null_templated()
*/
template <typename Map>
void sparse_to_dense_results(table_view const& keys,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
// size_type map_size,
Map const& map,
map_type const& map,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream,
Expand All @@ -409,7 +413,7 @@ void sparse_to_dense_results(table_view const& keys,

// Given an aggregation, this will get the result from sparse_results and
// convert and return dense, compacted result
auto finalizer = hash_compound_agg_finalizer<Map>(
auto finalizer = hash_compound_agg_finalizer(
col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr);
for (auto&& agg : agg_v) {
agg->finalize(finalizer);
Expand All @@ -421,36 +425,32 @@ void sparse_to_dense_results(table_view const& keys,
* @brief Construct hash map that uses row comparator and row hasher on
* `d_keys` table and stores indices
*/
auto create_hash_map(table_device_view const& d_keys,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream)
auto create_hash_map(
std::shared_ptr<cudf::experimental::row::hash::preprocessed_table> const& preprocessed_keys,
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
cudf::size_type const num_keys,
bool const keys_have_nulls,
null_policy const include_null_keys,
rmm::cuda_stream_view stream)
{
size_type constexpr unused_key{std::numeric_limits<size_type>::max()};
size_type constexpr unused_value{std::numeric_limits<size_type>::max()};

using map_type =
concurrent_unordered_map<size_type,
size_type,
row_hasher<cudf::detail::default_hash, nullate::DYNAMIC>,
row_equality_comparator<nullate::DYNAMIC>>;

using allocator_type = typename map_type::allocator_type;

auto const null_keys_are_equal =
include_null_keys == null_policy::INCLUDE ? null_equality::EQUAL : null_equality::UNEQUAL;
auto const has_null = nullate::DYNAMIC{keys_have_nulls};

row_hasher<cudf::detail::default_hash, nullate::DYNAMIC> hasher{nullate::DYNAMIC{keys_have_nulls},
d_keys};
row_equality_comparator rows_equal{
nullate::DYNAMIC{keys_have_nulls}, d_keys, d_keys, null_keys_are_equal};
cudf::experimental::row::equality::self_comparator row_equal(preprocessed_keys);
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
auto key_equal = row_equal.device_comparator(has_null, null_keys_are_equal);
auto row_hash = cudf::experimental::row::hash::row_hasher(preprocessed_keys);
PointKernel marked this conversation as resolved.
Show resolved Hide resolved

return map_type::create(compute_hash_table_size(d_keys.num_rows()),
return map_type::create(compute_hash_table_size(num_keys),
stream,
unused_key,
unused_value,
hasher,
rows_equal,
row_hash.device_hasher(has_null),
key_equal,
allocator_type());
}

Expand Down Expand Up @@ -491,11 +491,10 @@ auto create_sparse_results_table(table_view const& flattened_values,
* @brief Computes all aggregations from `requests` that require a single pass
* over the data and stores the results in `sparse_results`
*/
template <typename Map>
void compute_single_pass_aggs(table_view const& keys,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
Map& map,
map_type& map,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream)
Expand All @@ -509,22 +508,22 @@ void compute_single_pass_aggs(table_view const& keys,
auto d_sparse_table = mutable_table_device_view::create(sparse_table, stream);
auto d_values = table_device_view::create(flattened_values, stream);
auto const d_aggs = cudf::detail::make_device_uvector_async(agg_kinds, stream);

bool skip_key_rows_with_nulls = keys_have_nulls and include_null_keys == null_policy::EXCLUDE;
auto const skip_key_rows_with_nulls =
keys_have_nulls and include_null_keys == null_policy::EXCLUDE;

auto row_bitmask =
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
skip_key_rows_with_nulls ? cudf::detail::bitmask_and(keys, stream).first : rmm::device_buffer{};
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn<Map>{map,
keys.num_rows(),
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
hash::compute_single_pass_aggs_fn<map_type>{map,
keys.num_rows(),
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
// Add results back to sparse_results cache
auto sparse_result_cols = sparse_table.release();
for (size_t i = 0; i < aggs.size(); i++) {
Expand All @@ -538,8 +537,7 @@ void compute_single_pass_aggs(table_view const& keys,
* @brief Computes and returns a device vector containing all populated keys in
* `map`.
*/
template <typename Map>
rmm::device_uvector<size_type> extract_populated_keys(Map map,
rmm::device_uvector<size_type> extract_populated_keys(map_type const& map,
size_type num_keys,
rmm::cuda_stream_view stream)
{
Expand Down Expand Up @@ -594,8 +592,10 @@ std::unique_ptr<table> groupby(table_view const& keys,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto d_keys_ptr = table_device_view::create(keys, stream);
auto map = create_hash_map(*d_keys_ptr, keys_have_nulls, include_null_keys, stream);
auto const num_keys = keys.num_rows();
auto preprocessed_keys = cudf::experimental::row::hash::preprocessed_table::create(keys, stream);
auto map =
create_hash_map(preprocessed_keys, num_keys, keys_have_nulls, include_null_keys, stream);

// Cache of sparse results where the location of aggregate value in each
// column is indexed by the hash map
Expand Down Expand Up @@ -670,7 +670,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby(
cudf::detail::result_cache cache(requests.size());

std::unique_ptr<table> unique_keys =
groupby(keys, requests, &cache, has_nulls(keys), include_null_keys, stream, mr);
groupby(keys, requests, &cache, cudf::has_nulls(keys), include_null_keys, stream, mr);

return std::pair(std::move(unique_keys), extract_results(requests, cache, stream, mr));
}
Expand Down