Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast path for experimental::row::equality #12676

Merged
merged 22 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4a8085a
building equality::self_comparator
divyegala Feb 2, 2023
f71d161
two table comp
divyegala Feb 2, 2023
3ca298c
copyright years
divyegala Feb 2, 2023
7c167a7
centralizing repeated logic
divyegala Feb 2, 2023
0ceb79e
address review to create functors
divyegala Feb 3, 2023
37e7326
updating has_nested_columns docs
divyegala Feb 3, 2023
b44f603
Merge remote-tracking branch 'upstream/branch-23.04' into equality-co…
divyegala Feb 3, 2023
c2ff1fc
address review for underscore prefixes in structs
divyegala Feb 7, 2023
c2ca8ee
Merge remote-tracking branch 'upstream/branch-23.04' into equality-co…
divyegala Feb 7, 2023
ffdf10c
Merge remote-tracking branch 'upstream/branch-23.04' into equality-co…
divyegala Feb 8, 2023
53e918f
add rank
divyegala Feb 8, 2023
65e2bce
fix compile times for rank
divyegala Feb 8, 2023
c6bc7f5
Merge remote-tracking branch 'upstream/branch-23.04' into equality-co…
divyegala Feb 8, 2023
1344e33
Apply suggestions from code review
divyegala Feb 11, 2023
4123379
address review
divyegala Feb 11, 2023
26f38b3
Merge remote-tracking branch 'upstream/branch-23.04' into equality-co…
divyegala Feb 11, 2023
9d0f7a6
address review, mark members of functors as private
divyegala Feb 11, 2023
fe41be8
removing partitioning
divyegala Feb 11, 2023
02dd5c5
simplify lists/contains since it already has a nested-type dispatch m…
divyegala Feb 12, 2023
5db4d03
Merge branch 'branch-23.04' into equality-comp-fast-path
divyegala Feb 13, 2023
9aa23a5
Merge branch 'branch-23.04' into equality-comp-fast-path
divyegala Feb 15, 2023
b52d0f3
Merge branch 'branch-23.04' into equality-comp-fast-path
divyegala Feb 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions cpp/include/cudf/table/experimental/row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1131,11 +1131,13 @@ struct nan_equal_physical_equality_comparator {
* returns false, representing unequal rows. If the rows are compared without mismatched elements,
* the rows are equal.
*
* @tparam has_nested_columns compile-time optimization for primitive types
divyegala marked this conversation as resolved.
Show resolved Hide resolved
* @tparam Nullate A cudf::nullate type describing whether to check for nulls.
* @tparam PhysicalEqualityComparator A equality comparator functor that compares individual values
* rather than logical elements, defaults to a comparator for which `NaN == NaN`.
*/
template <typename Nullate,
template <bool has_nested_columns,
typename Nullate,
typename PhysicalEqualityComparator = nan_equal_physical_equality_comparator>
class device_row_comparator {
friend class self_comparator; ///< Allow self_comparator to access private members
Expand Down Expand Up @@ -1246,14 +1248,14 @@ class device_row_comparator {

template <typename Element,
CUDF_ENABLE_IF(not cudf::is_equality_comparable<Element, Element>() and
not cudf::is_nested<Element>()),
(not has_nested_columns or not cudf::is_nested<Element>())),
typename... Args>
__device__ bool operator()(Args...)
{
CUDF_UNREACHABLE("Attempted to compare elements of uncomparable types.");
}

template <typename Element, CUDF_ENABLE_IF(cudf::is_nested<Element>())>
template <typename Element, CUDF_ENABLE_IF(has_nested_columns and cudf::is_nested<Element>())>
__device__ bool operator()(size_type const lhs_element_index,
size_type const rhs_element_index) const noexcept
{
Expand Down Expand Up @@ -1437,6 +1439,7 @@ class self_comparator {
*
* `F(i,j)` returns true if and only if row `i` compares equal to row `j`.
*
* @tparam has_nested_columns compile-time optimization for primitive types
* @tparam Nullate A cudf::nullate type describing whether to check for nulls.
* @tparam PhysicalEqualityComparator A equality comparator functor that compares individual
* values rather than logical elements, defaults to a comparator for which `NaN == NaN`.
Expand All @@ -1445,13 +1448,15 @@ class self_comparator {
* @param comparator Physical element equality comparison functor.
* @return A binary callable object
*/
template <typename Nullate,
template <bool has_nested_columns,
typename Nullate,
typename PhysicalEqualityComparator = nan_equal_physical_equality_comparator>
auto equal_to(Nullate nullate = {},
null_equality nulls_are_equal = null_equality::EQUAL,
PhysicalEqualityComparator comparator = {}) const noexcept
{
return device_row_comparator{nullate, *d_t, *d_t, nulls_are_equal, comparator};
return device_row_comparator<has_nested_columns, Nullate, PhysicalEqualityComparator>{
nullate, *d_t, *d_t, nulls_are_equal, comparator};
}

private:
Expand Down Expand Up @@ -1539,6 +1544,7 @@ class two_table_comparator {
* Similarly, `F(rhs_index_type i, lhs_index_type j)` returns true if and only if row `i` of the
* right table compares equal to row `j` of the left table.
*
* @tparam has_nested_columns compile-time optimization for primitive types
* @tparam Nullate A cudf::nullate type describing whether to check for nulls.
* @tparam PhysicalEqualityComparator A equality comparator functor that compares individual
* values rather than logical elements, defaults to a `NaN == NaN` equality comparator.
Expand All @@ -1547,14 +1553,16 @@ class two_table_comparator {
* @param comparator Physical element equality comparison functor.
* @return A binary callable object
*/
template <typename Nullate,
template <bool has_nested_columns,
typename Nullate,
typename PhysicalEqualityComparator = nan_equal_physical_equality_comparator>
auto equal_to(Nullate nullate = {},
null_equality nulls_are_equal = null_equality::EQUAL,
PhysicalEqualityComparator comparator = {}) const noexcept
{
return strong_index_comparator_adapter{
device_row_comparator(nullate, *d_left_table, *d_right_table, nulls_are_equal, comparator)};
device_row_comparator<has_nested_columns, Nullate, PhysicalEqualityComparator>(
nullate, *d_left_table, *d_right_table, nulls_are_equal, comparator)};
}

private:
Expand Down
59 changes: 41 additions & 18 deletions cpp/src/binaryop/compiled/struct_binary_ops.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -125,26 +125,49 @@ void apply_struct_equality_op(mutable_column_view& out,
auto trhs = table_view{{rhs}};
auto table_comparator =
cudf::experimental::row::equality::two_table_comparator{tlhs, trhs, stream};
auto device_comparator =
table_comparator.equal_to(nullate::DYNAMIC{has_nested_nulls(tlhs) || has_nested_nulls(trhs)},
null_equality::EQUAL,
comparator);

auto outd = column_device_view::create(out, stream);
auto optional_iter =
cudf::detail::make_optional_iterator<bool>(*outd, nullate::DYNAMIC{out.has_nulls()});
thrust::tabulate(rmm::exec_policy(stream),
out.begin<bool>(),
out.end<bool>(),
[optional_iter,
is_lhs_scalar,
is_rhs_scalar,
preserve_output = (op != binary_operator::NOT_EQUAL),
device_comparator] __device__(size_type i) {
auto lhs = cudf::experimental::row::lhs_index_type{is_lhs_scalar ? 0 : i};
auto rhs = cudf::experimental::row::rhs_index_type{is_rhs_scalar ? 0 : i};
return optional_iter[i].has_value() and
(device_comparator(lhs, rhs) == preserve_output);
});

if (cudf::detail::has_nested_columns(tlhs) or cudf::detail::has_nested_columns(trhs)) {
auto device_comparator = table_comparator.equal_to<true>(
nullate::DYNAMIC{has_nested_nulls(tlhs) || has_nested_nulls(trhs)},
null_equality::EQUAL,
comparator);

thrust::tabulate(rmm::exec_policy(stream),
out.begin<bool>(),
out.end<bool>(),
[optional_iter,
is_lhs_scalar,
is_rhs_scalar,
preserve_output = (op != binary_operator::NOT_EQUAL),
device_comparator] __device__(size_type i) {
auto lhs = cudf::experimental::row::lhs_index_type{is_lhs_scalar ? 0 : i};
auto rhs = cudf::experimental::row::rhs_index_type{is_rhs_scalar ? 0 : i};
return optional_iter[i].has_value() and
(device_comparator(lhs, rhs) == preserve_output);
});
divyegala marked this conversation as resolved.
Show resolved Hide resolved
} else {
auto device_comparator = table_comparator.equal_to<false>(
nullate::DYNAMIC{has_nested_nulls(tlhs) || has_nested_nulls(trhs)},
null_equality::EQUAL,
comparator);

thrust::tabulate(rmm::exec_policy(stream),
out.begin<bool>(),
out.end<bool>(),
[optional_iter,
is_lhs_scalar,
is_rhs_scalar,
preserve_output = (op != binary_operator::NOT_EQUAL),
device_comparator] __device__(size_type i) {
auto lhs = cudf::experimental::row::lhs_index_type{is_lhs_scalar ? 0 : i};
auto rhs = cudf::experimental::row::rhs_index_type{is_rhs_scalar ? 0 : i};
return optional_iter[i].has_value() and
(device_comparator(lhs, rhs) == preserve_output);
});
}
}
} // namespace cudf::binops::compiled::detail
139 changes: 80 additions & 59 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,18 @@ namespace {

// TODO: replace it with `cuco::static_map`
// https://github.com/rapidsai/cudf/issues/10401
using map_type = concurrent_unordered_map<
cudf::size_type,
cudf::size_type,
cudf::experimental::row::hash::device_row_hasher<cudf::detail::default_hash,
cudf::nullate::DYNAMIC>,
cudf::experimental::row::equality::device_row_comparator<cudf::nullate::DYNAMIC>>;
template <typename ComparatorType>
using map_type =
concurrent_unordered_map<cudf::size_type,
cudf::size_type,
cudf::experimental::row::hash::
device_row_hasher<cudf::detail::default_hash, cudf::nullate::DYNAMIC>,
ComparatorType>;

template <bool has_nested_columns>
using comparator_type =
cudf::experimental::row::equality::device_row_comparator<has_nested_columns,
cudf::nullate::DYNAMIC>;
divyegala marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief List of aggregation operations that can be computed with a hash-based
Expand Down Expand Up @@ -189,13 +195,14 @@ class groupby_simple_aggregations_collector final
}
};

template <typename ComparatorType>
class hash_compound_agg_finalizer final : public cudf::detail::aggregation_finalizer {
column_view col;
data_type result_type;
cudf::detail::result_cache* sparse_results;
cudf::detail::result_cache* dense_results;
device_span<size_type const> gather_map;
map_type const& map;
map_type<ComparatorType> const& map;
bitmask_type const* __restrict__ row_bitmask;
rmm::cuda_stream_view stream;
rmm::mr::device_memory_resource* mr;
Expand All @@ -207,7 +214,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
map_type const& map,
map_type<ComparatorType> const& map,
bitmask_type const* row_bitmask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
Expand Down Expand Up @@ -336,7 +343,7 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
col.size(),
::cudf::detail::var_hash_functor<map_type>{
::cudf::detail::var_hash_functor<map_type<ComparatorType>>{
map, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof});
sparse_results->add_result(col, agg, std::move(var_result));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
Expand Down Expand Up @@ -394,12 +401,13 @@ flatten_single_pass_aggs(host_span<aggregation_request const> requests)
*
* @see groupby_null_templated()
*/
template <typename ComparatorType>
void sparse_to_dense_results(table_view const& keys,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
map_type const& map,
map_type<ComparatorType> const& map,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream,
Expand Down Expand Up @@ -461,10 +469,11 @@ auto create_sparse_results_table(table_view const& flattened_values,
* @brief Computes all aggregations from `requests` that require a single pass
* over the data and stores the results in `sparse_results`
*/
template <typename ComparatorType>
void compute_single_pass_aggs(table_view const& keys,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
map_type& map,
map_type<ComparatorType>& map,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream)
Expand All @@ -484,16 +493,16 @@ void compute_single_pass_aggs(table_view const& keys,
auto row_bitmask =
skip_key_rows_with_nulls ? cudf::detail::bitmask_and(keys, stream).first : rmm::device_buffer{};

thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn<map_type>{map,
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn<map_type<ComparatorType>>{
map,
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
// Add results back to sparse_results cache
auto sparse_result_cols = sparse_table.release();
for (size_t i = 0; i < aggs.size(); i++) {
Expand All @@ -507,7 +516,8 @@ void compute_single_pass_aggs(table_view const& keys,
* @brief Computes and returns a device vector containing all populated keys in
* `map`.
*/
rmm::device_uvector<size_type> extract_populated_keys(map_type const& map,
template <typename ComparatorType>
rmm::device_uvector<size_type> extract_populated_keys(map_type<ComparatorType> const& map,
size_type num_keys,
rmm::cuda_stream_view stream)
{
Expand Down Expand Up @@ -566,52 +576,63 @@ std::unique_ptr<table> groupby(table_view const& keys,
auto preprocessed_keys = cudf::experimental::row::hash::preprocessed_table::create(keys, stream);
auto const comparator = cudf::experimental::row::equality::self_comparator{preprocessed_keys};
auto const row_hash = cudf::experimental::row::hash::row_hasher{std::move(preprocessed_keys)};
auto const d_key_equal = comparator.equal_to(has_null, null_keys_are_equal);
auto const d_row_hash = row_hash.device_hasher(has_null);

size_type constexpr unused_key{std::numeric_limits<size_type>::max()};
size_type constexpr unused_value{std::numeric_limits<size_type>::max()};

using allocator_type = typename map_type::allocator_type;

auto map = map_type::create(compute_hash_table_size(num_keys),
stream,
unused_key,
unused_value,
d_row_hash,
d_key_equal,
allocator_type());

// Cache of sparse results where the location of aggregate value in each
// column is indexed by the hash map
cudf::detail::result_cache sparse_results(requests.size());

// Compute all single pass aggs first
compute_single_pass_aggs(
keys, requests, &sparse_results, *map, keys_have_nulls, include_null_keys, stream);

// Extract the populated indices from the hash map and create a gather map.
// Gathering using this map from sparse results will give dense results.
auto gather_map = extract_populated_keys(*map, keys.num_rows(), stream);

// Compact all results from sparse_results and insert into cache
sparse_to_dense_results(keys,
requests,
&sparse_results,
cache,
gather_map,
*map,
keys_have_nulls,
include_null_keys,
stream,
mr);

return cudf::detail::gather(keys,
gather_map,
out_of_bounds_policy::DONT_CHECK,
cudf::detail::negative_index_policy::NOT_ALLOWED,
stream,
mr);
auto const comparator_helper = [&](auto const d_key_equal) {
using allocator_type = typename map_type<decltype(d_key_equal)>::allocator_type;

auto const map = map_type<decltype(d_key_equal)>::create(compute_hash_table_size(num_keys),
stream,
unused_key,
unused_value,
d_row_hash,
d_key_equal,
allocator_type());
// Compute all single pass aggs first
compute_single_pass_aggs(
keys, requests, &sparse_results, *map, keys_have_nulls, include_null_keys, stream);

// Extract the populated indices from the hash map and create a gather map.
// Gathering using this map from sparse results will give dense results.
auto gather_map = extract_populated_keys(*map, keys.num_rows(), stream);

// Compact all results from sparse_results and insert into cache
sparse_to_dense_results(keys,
requests,
&sparse_results,
cache,
gather_map,
*map,
keys_have_nulls,
include_null_keys,
stream,
mr);

return cudf::detail::gather(keys,
gather_map,
out_of_bounds_policy::DONT_CHECK,
cudf::detail::negative_index_policy::NOT_ALLOWED,
stream,
mr);
};

if (cudf::detail::has_nested_columns(keys)) {
auto const d_key_equal = comparator.equal_to<true>(has_null, null_keys_are_equal);

return comparator_helper(d_key_equal);

} else {
auto const d_key_equal = comparator.equal_to<false>(has_null, null_keys_are_equal);

return comparator_helper(d_key_equal);
divyegala marked this conversation as resolved.
Show resolved Hide resolved
}
}

} // namespace
Expand Down
Loading