From 6eac9207ca0804aeca64c83c533e16ad5963b0ba Mon Sep 17 00:00:00 2001 From: Srinivas Yadav <43375352+srinivasyadav18@users.noreply.github.com> Date: Wed, 26 Jun 2024 17:16:57 -0700 Subject: [PATCH] Refactor distinct with hashset-based algorithms (#15984) Refactor **distinct** algorithm to use `cuco::static_set`. Authors: - Srinivas Yadav (https://github.com/srinivasyadav18) - Yunsong Wang (https://github.com/PointKernel) Approvers: - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/15984 --- cpp/src/stream_compaction/distinct.cu | 146 ++++++-------- cpp/src/stream_compaction/distinct_count.cu | 3 +- cpp/src/stream_compaction/distinct_helpers.cu | 189 ++++++++++-------- .../stream_compaction/distinct_helpers.hpp | 58 +++--- .../stream_compaction_common.cuh | 5 +- .../stream_compaction_common.hpp | 35 ---- cpp/src/stream_compaction/unique.cu | 1 - 7 files changed, 208 insertions(+), 229 deletions(-) delete mode 100644 cpp/src/stream_compaction/stream_compaction_common.hpp diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index a6f15cc49ec..e5cf29f3ebf 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -17,28 +17,62 @@ #include "distinct_helpers.hpp" #include +#include #include #include #include #include +#include #include #include #include +#include +#include #include #include -#include -#include -#include -#include -#include - #include #include namespace cudf { namespace detail { +namespace { +/** + * @brief Invokes the given `func` with desired the row equality + * + * @tparam HasNested Flag indicating whether there are nested columns in the input + * @tparam Func Type of the helper function doing `distinct` check + * + * @param compare_nulls Control whether nulls should be compared as equal or not + * @param compare_nans Control whether floating-point NaNs values should be compared as equal or not + * @param has_nulls Flag indicating whether the input has nulls or not + * @param row_equal Self table comparator + * @param func The input functor to invoke + */ +template +rmm::device_uvector dipatch_row_equal( + null_equality compare_nulls, + nan_equality compare_nans, + bool has_nulls, + cudf::experimental::row::equality::self_comparator row_equal, + Func&& func) +{ + if (compare_nans == nan_equality::ALL_EQUAL) { + auto const d_equal = row_equal.equal_to( + nullate::DYNAMIC{has_nulls}, + compare_nulls, + cudf::experimental::row::equality::nan_equal_physical_equality_comparator{}); + return func(d_equal); + } else { + auto const d_equal = row_equal.equal_to( + nullate::DYNAMIC{has_nulls}, + compare_nulls, + cudf::experimental::row::equality::physical_equality_comparator{}); + return func(d_equal); + } +} +} // namespace rmm::device_uvector distinct_indices(table_view const& input, duplicate_keep_option keep, @@ -47,97 +81,39 @@ rmm::device_uvector distinct_indices(table_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (input.num_rows() == 0 or input.num_columns() == 0) { + auto const num_rows = input.num_rows(); + + if (num_rows == 0 or input.num_columns() == 0) { return rmm::device_uvector(0, stream, mr); } - auto map = hash_map_type{compute_hash_table_size(input.num_rows()), - cuco::empty_key{-1}, - cuco::empty_value{std::numeric_limits::min()}, - cudf::detail::cuco_allocator{stream}, - stream.value()}; - auto const preprocessed_input = cudf::experimental::row::hash::preprocessed_table::create(input, stream); auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)}; auto const has_nested_columns = cudf::detail::has_nested_columns(input); - auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const key_hasher = row_hasher.device_hasher(has_nulls); - - auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); - - auto const pair_iter = cudf::detail::make_counting_transform_iterator( - size_type{0}, - cuda::proclaim_return_type>( - [] __device__(size_type const i) { return cuco::make_pair(i, i); })); - - auto const insert_keys = [&](auto const value_comp) { - if (has_nested_columns) { - auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); - map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value()); - } else { - auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); - map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value()); - } + auto const row_hash = cudf::experimental::row::hash::row_hasher(preprocessed_input); + auto const row_equal = cudf::experimental::row::equality::self_comparator(preprocessed_input); + + auto const helper_func = [&](auto const& d_equal) { + using RowHasher = std::decay_t; + auto set = hash_set_type{num_rows, + 0.5, // desired load factor + cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL}, + d_equal, + {row_hash.device_hasher(has_nulls)}, + {}, + {}, + cudf::detail::cuco_allocator{stream}, + stream.value()}; + return detail::reduce_by_row(set, num_rows, keep, stream, mr); }; - if (nans_equal == nan_equality::ALL_EQUAL) { - using nan_equal_comparator = - cudf::experimental::row::equality::nan_equal_physical_equality_comparator; - insert_keys(nan_equal_comparator{}); + if (cudf::detail::has_nested_columns(input)) { + return dipatch_row_equal(nulls_equal, nans_equal, has_nulls, row_equal, helper_func); } else { - using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator; - insert_keys(nan_unequal_comparator{}); + return dipatch_row_equal(nulls_equal, nans_equal, has_nulls, row_equal, helper_func); } - - auto output_indices = rmm::device_uvector(map.get_size(), stream, mr); - - // If we don't care about order, just gather indices of distinct keys taken from map. - if (keep == duplicate_keep_option::KEEP_ANY) { - map.retrieve_all(output_indices.begin(), thrust::make_discard_iterator(), stream.value()); - return output_indices; - } - - // For other keep options, reduce by row on rows that compare equal. - auto const reduction_results = reduce_by_row(map, - std::move(preprocessed_input), - input.num_rows(), - has_nulls, - has_nested_columns, - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); - - // Extract the desired output indices from reduction results. - auto const map_end = [&] { - if (keep == duplicate_keep_option::KEEP_NONE) { - // Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`. - // Thus, we only output index of the rows in the groups having group size of `1`. - return thrust::copy_if(rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(input.num_rows()), - output_indices.begin(), - [reduction_results = reduction_results.begin()] __device__( - auto const idx) { return reduction_results[idx] == size_type{1}; }); - } - - // Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in - // each group of equal rows (which are the desired output indices), or the value given by - // `reduction_init_value()`. - return thrust::copy_if(rmm::exec_policy(stream), - reduction_results.begin(), - reduction_results.end(), - output_indices.begin(), - [init_value = reduction_init_value(keep)] __device__(auto const idx) { - return idx != init_value; - }); - }(); - - output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream); - return output_indices; } std::unique_ptr distinct(table_view const& input, diff --git a/cpp/src/stream_compaction/distinct_count.cu b/cpp/src/stream_compaction/distinct_count.cu index 99ca89cc021..9843bb889f4 100644 --- a/cpp/src/stream_compaction/distinct_count.cu +++ b/cpp/src/stream_compaction/distinct_count.cu @@ -15,16 +15,17 @@ */ #include "stream_compaction_common.cuh" -#include "stream_compaction_common.hpp" #include #include #include +#include #include #include #include #include #include +#include #include #include #include diff --git a/cpp/src/stream_compaction/distinct_helpers.cu b/cpp/src/stream_compaction/distinct_helpers.cu index 13e89b15bb7..c3a004b7f28 100644 --- a/cpp/src/stream_compaction/distinct_helpers.cu +++ b/cpp/src/stream_compaction/distinct_helpers.cu @@ -16,96 +16,127 @@ #include "distinct_helpers.hpp" -#include - -#include +#include +#include namespace cudf::detail { -namespace { -/** - * @brief The functor to find the first/last/all duplicate row for rows that compared equal. - */ -template -struct reduce_fn : reduce_by_row_fn_base { - duplicate_keep_option const keep; - - reduce_fn(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - duplicate_keep_option const keep, - size_type* const d_output) - : reduce_by_row_fn_base{d_map, - d_hasher, - d_equal, - d_output}, - keep{keep} - { +template +rmm::device_uvector reduce_by_row(hash_set_type& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto output_indices = rmm::device_uvector(num_rows, stream, mr); + + // If we don't care about order, just gather indices of distinct keys taken from set. + if (keep == duplicate_keep_option::KEEP_ANY) { + auto const iter = thrust::counting_iterator{0}; + set.insert_async(iter, iter + num_rows, stream.value()); + auto const output_end = set.retrieve_all(output_indices.begin(), stream.value()); + output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream); + return output_indices; } - __device__ void operator()(size_type const idx) const - { - auto const out_ptr = this->get_output_ptr(idx); - - if (keep == duplicate_keep_option::KEEP_FIRST) { - // Store the smallest index of all rows that are equal. - atomicMin(out_ptr, idx); - } else if (keep == duplicate_keep_option::KEEP_LAST) { - // Store the greatest index of all rows that are equal. - atomicMax(out_ptr, idx); - } else { - // Count the number of rows in each group of rows that are compared equal. - atomicAdd(out_ptr, size_type{1}); + auto reduction_results = rmm::device_uvector(num_rows, stream, mr); + thrust::uninitialized_fill(rmm::exec_policy_nosync(stream), + reduction_results.begin(), + reduction_results.end(), + reduction_init_value(keep)); + + auto set_ref = set.ref(cuco::op::insert_and_find); + + thrust::for_each(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_rows), + [set_ref, keep, reduction_results = reduction_results.begin()] __device__( + size_type const idx) mutable { + auto const [inserted_idx_ptr, _] = set_ref.insert_and_find(idx); + + auto ref = cuda::atomic_ref{ + reduction_results[*inserted_idx_ptr]}; + if (keep == duplicate_keep_option::KEEP_FIRST) { + // Store the smallest index of all rows that are equal. + ref.fetch_min(idx, cuda::memory_order_relaxed); + } else if (keep == duplicate_keep_option::KEEP_LAST) { + // Store the greatest index of all rows that are equal. + ref.fetch_max(idx, cuda::memory_order_relaxed); + } else { + // Count the number of rows in each group of rows that are compared equal. + ref.fetch_add(size_type{1}, cuda::memory_order_relaxed); + } + }); + + auto const map_end = [&] { + if (keep == duplicate_keep_option::KEEP_NONE) { + // Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`. + // Thus, we only output index of the rows in the groups having group size of `1`. + return thrust::copy_if( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_rows), + output_indices.begin(), + cuda::proclaim_return_type( + [reduction_results = reduction_results.begin()] __device__(auto const idx) { + return reduction_results[idx] == size_type{1}; + })); } - } -}; -/** - * @brief The builder to construct an instance of `reduce_fn` functor base on the given - * value of the `duplicate_keep_option` member variable. - */ -struct reduce_func_builder { - duplicate_keep_option const keep; - - template - auto build(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - size_type* const d_output) - { - return reduce_fn{d_map, d_hasher, d_equal, keep, d_output}; - } -}; + // Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in + // each group of equal rows (which are the desired output indices), or the value given by + // `reduction_init_value()`. + return thrust::copy_if( + rmm::exec_policy(stream), + reduction_results.begin(), + reduction_results.end(), + output_indices.begin(), + cuda::proclaim_return_type([init_value = reduction_init_value(keep)] __device__( + auto const idx) { return idx != init_value; })); + }(); -} // namespace + output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream); + return output_indices; +} -// This function is split from `distinct.cu` to improve compile time. -rmm::device_uvector reduce_by_row( - hash_map_type const& map, - std::shared_ptr const preprocessed_input, +template rmm::device_uvector reduce_by_row( + hash_set_type>& set, size_type num_rows, - cudf::nullate::DYNAMIC has_nulls, - bool has_nested_columns, duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_EXPECTS(keep != duplicate_keep_option::KEEP_ANY, - "This function should not be called with KEEP_ANY"); - - return hash_reduce_by_row(map, - preprocessed_input, - num_rows, - has_nulls, - has_nested_columns, - nulls_equal, - nans_equal, - reduce_func_builder{keep}, - reduction_init_value(keep), - stream, - mr); -} + rmm::device_async_resource_ref mr); + +template rmm::device_uvector reduce_by_row( + hash_set_type>& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +template rmm::device_uvector reduce_by_row( + hash_set_type>& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +template rmm::device_uvector reduce_by_row( + hash_set_type>& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); } // namespace cudf::detail diff --git a/cpp/src/stream_compaction/distinct_helpers.hpp b/cpp/src/stream_compaction/distinct_helpers.hpp index 40f97e00ce5..fca67c98873 100644 --- a/cpp/src/stream_compaction/distinct_helpers.hpp +++ b/cpp/src/stream_compaction/distinct_helpers.hpp @@ -14,8 +14,7 @@ * limitations under the License. */ -#include "stream_compaction_common.hpp" - +#include #include #include #include @@ -24,6 +23,12 @@ #include #include +#include +#include +#include +#include +#include + namespace cudf::detail { /** @@ -42,13 +47,28 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) } } +template +using hash_set_type = + cuco::static_set, + cuda::thread_scope_device, + RowHasher, + cuco::linear_probing<1, + cudf::experimental::row::hash::device_row_hasher< + cudf::hashing::detail::default_hash, + cudf::nullate::DYNAMIC>>, + cudf::detail::cuco_allocator, + cuco::storage<1>>; + /** - * @brief Perform a reduction on groups of rows that are compared equal. + * @brief Perform a reduction on groups of rows that are compared equal and returns output indices + * of the occurrences of the distinct elements based on `keep` parameter. * * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared - * equal. A hash table is used to find groups of equal rows. + * equal. A hash set is used to find groups of equal rows. * * Depending on the `keep` parameter, the reduction operation for each row group is: + * - If `keep == KEEP_ANY` : order does not matter. * - If `keep == KEEP_FIRST`: min of row indices in the group. * - If `keep == KEEP_LAST`: max of row indices in the group. * - If `keep == KEEP_NONE`: count of equivalent rows (group size). @@ -59,30 +79,18 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * the `reduction_init_value()` function. Then, the reduction result for each row group is written * into the output array at the index of an unspecified row in the group. * - * @param map The auxiliary map to perform reduction - * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row - * comparisons + * @param set The auxiliary set to perform reduction + * @param set_size The number of elements in set * @param num_rows The number of all input rows - * @param has_nulls Indicate whether the input rows has any nulls at any nested levels - * @param has_nested_columns Indicates whether the input table has any nested columns * @param keep The parameter to determine what type of reduction to perform - * @param nulls_equal Flag to specify whether null elements should be considered as equal - * @param nans_equal Flag to specify whether NaN values in floating point column should be - * considered equal. * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned vector - * @return A device_uvector containing the reduction results + * @return A device_uvector containing the output indices */ -rmm::device_uvector reduce_by_row( - hash_map_type const& map, - std::shared_ptr const preprocessed_input, - size_type num_rows, - cudf::nullate::DYNAMIC has_nulls, - bool has_nested_columns, - duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); - +template +rmm::device_uvector reduce_by_row(hash_set_type& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); } // namespace cudf::detail diff --git a/cpp/src/stream_compaction/stream_compaction_common.cuh b/cpp/src/stream_compaction/stream_compaction_common.cuh index 839672d6a56..0f9bc18e258 100644 --- a/cpp/src/stream_compaction/stream_compaction_common.cuh +++ b/cpp/src/stream_compaction/stream_compaction_common.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,9 +15,8 @@ */ #pragma once -#include "stream_compaction_common.hpp" - #include +#include #include #include diff --git a/cpp/src/stream_compaction/stream_compaction_common.hpp b/cpp/src/stream_compaction/stream_compaction_common.hpp deleted file mode 100644 index 13795f49781..00000000000 --- a/cpp/src/stream_compaction/stream_compaction_common.hpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include -#include -#include - -#include -#include - -#include - -namespace cudf { -namespace detail { - -using hash_map_type = cuco::legacy:: - static_map; - -} // namespace detail -} // namespace cudf diff --git a/cpp/src/stream_compaction/unique.cu b/cpp/src/stream_compaction/unique.cu index c1f8b17938c..edb47984d13 100644 --- a/cpp/src/stream_compaction/unique.cu +++ b/cpp/src/stream_compaction/unique.cu @@ -15,7 +15,6 @@ */ #include "stream_compaction_common.cuh" -#include "stream_compaction_common.hpp" #include #include