Skip to content

Commit

Permalink
Refactor distinct with hashset-based algorithms (#15984)
Browse files Browse the repository at this point in the history
Refactor **distinct** algorithm to use `cuco::static_set`.

Authors:
  - Srinivas Yadav (https://github.com/srinivasyadav18)
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)

URL: #15984
  • Loading branch information
srinivasyadav18 authored Jun 27, 2024
1 parent 563556e commit 6eac920
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 229 deletions.
146 changes: 61 additions & 85 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,62 @@
#include "distinct_helpers.hpp"

#include <cudf/column/column_view.hpp>
#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/resource_ref.hpp>

#include <cuda/functional>
#include <thrust/copy.h>
#include <thrust/distance.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>

#include <utility>
#include <vector>

namespace cudf {
namespace detail {
namespace {
/**
* @brief Invokes the given `func` with desired the row equality
*
* @tparam HasNested Flag indicating whether there are nested columns in the input
* @tparam Func Type of the helper function doing `distinct` check
*
* @param compare_nulls Control whether nulls should be compared as equal or not
* @param compare_nans Control whether floating-point NaNs values should be compared as equal or not
* @param has_nulls Flag indicating whether the input has nulls or not
* @param row_equal Self table comparator
* @param func The input functor to invoke
*/
template <bool HasNested, typename Func>
rmm::device_uvector<cudf::size_type> dipatch_row_equal(
null_equality compare_nulls,
nan_equality compare_nans,
bool has_nulls,
cudf::experimental::row::equality::self_comparator row_equal,
Func&& func)
{
if (compare_nans == nan_equality::ALL_EQUAL) {
auto const d_equal = row_equal.equal_to<HasNested>(
nullate::DYNAMIC{has_nulls},
compare_nulls,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator{});
return func(d_equal);
} else {
auto const d_equal = row_equal.equal_to<HasNested>(
nullate::DYNAMIC{has_nulls},
compare_nulls,
cudf::experimental::row::equality::physical_equality_comparator{});
return func(d_equal);
}
}
} // namespace

rmm::device_uvector<size_type> distinct_indices(table_view const& input,
duplicate_keep_option keep,
Expand All @@ -47,97 +81,39 @@ rmm::device_uvector<size_type> distinct_indices(table_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (input.num_rows() == 0 or input.num_columns() == 0) {
auto const num_rows = input.num_rows();

if (num_rows == 0 or input.num_columns() == 0) {
return rmm::device_uvector<size_type>(0, stream, mr);
}

auto map = hash_map_type{compute_hash_table_size(input.num_rows()),
cuco::empty_key{-1},
cuco::empty_value{std::numeric_limits<size_type>::min()},
cudf::detail::cuco_allocator{stream},
stream.value()};

auto const preprocessed_input =
cudf::experimental::row::hash::preprocessed_table::create(input, stream);
auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)};
auto const has_nested_columns = cudf::detail::has_nested_columns(input);

auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const key_hasher = row_hasher.device_hasher(has_nulls);

auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input);

auto const pair_iter = cudf::detail::make_counting_transform_iterator(
size_type{0},
cuda::proclaim_return_type<cuco::pair<size_type, size_type>>(
[] __device__(size_type const i) { return cuco::make_pair(i, i); }));

auto const insert_keys = [&](auto const value_comp) {
if (has_nested_columns) {
auto const key_equal = row_comp.equal_to<true>(has_nulls, nulls_equal, value_comp);
map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value());
} else {
auto const key_equal = row_comp.equal_to<false>(has_nulls, nulls_equal, value_comp);
map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value());
}
auto const row_hash = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const row_equal = cudf::experimental::row::equality::self_comparator(preprocessed_input);

auto const helper_func = [&](auto const& d_equal) {
using RowHasher = std::decay_t<decltype(d_equal)>;
auto set = hash_set_type<RowHasher>{num_rows,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_equal,
{row_hash.device_hasher(has_nulls)},
{},
{},
cudf::detail::cuco_allocator{stream},
stream.value()};
return detail::reduce_by_row(set, num_rows, keep, stream, mr);
};

if (nans_equal == nan_equality::ALL_EQUAL) {
using nan_equal_comparator =
cudf::experimental::row::equality::nan_equal_physical_equality_comparator;
insert_keys(nan_equal_comparator{});
if (cudf::detail::has_nested_columns(input)) {
return dipatch_row_equal<true>(nulls_equal, nans_equal, has_nulls, row_equal, helper_func);
} else {
using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator;
insert_keys(nan_unequal_comparator{});
return dipatch_row_equal<false>(nulls_equal, nans_equal, has_nulls, row_equal, helper_func);
}

auto output_indices = rmm::device_uvector<size_type>(map.get_size(), stream, mr);

// If we don't care about order, just gather indices of distinct keys taken from map.
if (keep == duplicate_keep_option::KEEP_ANY) {
map.retrieve_all(output_indices.begin(), thrust::make_discard_iterator(), stream.value());
return output_indices;
}

// For other keep options, reduce by row on rows that compare equal.
auto const reduction_results = reduce_by_row(map,
std::move(preprocessed_input),
input.num_rows(),
has_nulls,
has_nested_columns,
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());

// Extract the desired output indices from reduction results.
auto const map_end = [&] {
if (keep == duplicate_keep_option::KEEP_NONE) {
// Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`.
// Thus, we only output index of the rows in the groups having group size of `1`.
return thrust::copy_if(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.num_rows()),
output_indices.begin(),
[reduction_results = reduction_results.begin()] __device__(
auto const idx) { return reduction_results[idx] == size_type{1}; });
}

// Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in
// each group of equal rows (which are the desired output indices), or the value given by
// `reduction_init_value()`.
return thrust::copy_if(rmm::exec_policy(stream),
reduction_results.begin(),
reduction_results.end(),
output_indices.begin(),
[init_value = reduction_init_value(keep)] __device__(auto const idx) {
return idx != init_value;
});
}();

output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream);
return output_indices;
}

std::unique_ptr<table> distinct(table_view const& input,
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/stream_compaction/distinct_count.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
*/

#include "stream_compaction_common.cuh"
#include "stream_compaction_common.hpp"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/sorting.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/hashing/detail/helper_functions.cuh>
#include <cudf/stream_compaction.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table_view.hpp>
Expand Down
Loading

0 comments on commit 6eac920

Please sign in to comment.