Skip to content

Commit

Permalink
Add distinct left join (#15149)
Browse files Browse the repository at this point in the history
Contributes to #14948

This PR adds distinct left join. It also cleans up the distinct inner code to use the terms "build" and "probe" consistently instead of "left" and "right".

Authors:
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Jason Lowe (https://github.com/jlowe)
  - Nghia Truong (https://github.com/ttnghia)

URL: #15149
  • Loading branch information
PointKernel authored Mar 6, 2024
1 parent b5bc531 commit aabfd83
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 19 deletions.
58 changes: 51 additions & 7 deletions cpp/benchmarks/join/distinct_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,44 @@ void distinct_inner_join(nvbench::state& state,
{
skip_helper(state);

auto join = [](cudf::table_view const& left_input,
cudf::table_view const& right_input,
auto join = [](cudf::table_view const& build_input,
cudf::table_view const& probe_input,
cudf::null_equality compare_nulls,
rmm::cuda_stream_view stream) {
auto const has_nulls = cudf::has_nested_nulls(left_input) || cudf::has_nested_nulls(right_input)
? cudf::nullable_join::YES
: cudf::nullable_join::NO;
auto hj_obj = cudf::distinct_hash_join<cudf::has_nested::NO>{
left_input, right_input, has_nulls, compare_nulls, stream};
auto const has_nulls =
cudf::has_nested_nulls(build_input) || cudf::has_nested_nulls(probe_input)
? cudf::nullable_join::YES
: cudf::nullable_join::NO;
auto hj_obj = cudf::distinct_hash_join<cudf::has_nested::NO>{
build_input, probe_input, has_nulls, compare_nulls, stream};
return hj_obj.inner_join(stream);
};

BM_join<key_type, payload_type, Nullable>(state, join);
}

template <typename key_type, typename payload_type, bool Nullable>
void distinct_left_join(nvbench::state& state,
nvbench::type_list<key_type, payload_type, nvbench::enum_type<Nullable>>)
{
skip_helper(state);

auto join = [](cudf::table_view const& build_input,
cudf::table_view const& probe_input,
cudf::null_equality compare_nulls,
rmm::cuda_stream_view stream) {
auto const has_nulls =
cudf::has_nested_nulls(build_input) || cudf::has_nested_nulls(probe_input)
? cudf::nullable_join::YES
: cudf::nullable_join::NO;
auto hj_obj = cudf::distinct_hash_join<cudf::has_nested::NO>{
build_input, probe_input, has_nulls, compare_nulls, stream};
return hj_obj.left_join(stream);
};

BM_join<key_type, payload_type, Nullable>(state, join);
}

// inner join -----------------------------------------------------------------------
NVBENCH_BENCH_TYPES(distinct_inner_join,
NVBENCH_TYPE_AXES(nvbench::type_list<nvbench::int32_t>,
Expand Down Expand Up @@ -75,3 +98,24 @@ NVBENCH_BENCH_TYPES(distinct_inner_join,
.set_type_axes_names({"Key Type", "Payload Type", "Nullable"})
.add_int64_axis("Build Table Size", {40'000'000, 50'000'000})
.add_int64_axis("Probe Table Size", {50'000'000, 120'000'000});

// left join ------------------------------------------------------------------------
NVBENCH_BENCH_TYPES(distinct_left_join,
NVBENCH_TYPE_AXES(nvbench::type_list<nvbench::int32_t>,
nvbench::type_list<nvbench::int32_t>,
nvbench::enum_type_list<false>))
.set_name("distinct_left_join_32bit")
.set_type_axes_names({"Key Type", "Payload Type", "Nullable"})
.add_int64_axis("Build Table Size", {100'000, 10'000'000, 80'000'000, 100'000'000})
.add_int64_axis("Probe Table Size",
{100'000, 400'000, 10'000'000, 40'000'000, 100'000'000, 240'000'000});

NVBENCH_BENCH_TYPES(distinct_left_join,
NVBENCH_TYPE_AXES(nvbench::type_list<nvbench::int32_t>,
nvbench::type_list<nvbench::int32_t>,
nvbench::enum_type_list<true>))
.set_name("distinct_left_join_32bit_nulls")
.set_type_axes_names({"Key Type", "Payload Type", "Nullable"})
.add_int64_axis("Build Table Size", {100'000, 10'000'000, 80'000'000, 100'000'000})
.add_int64_axis("Probe Table Size",
{100'000, 400'000, 10'000'000, 40'000'000, 100'000'000, 240'000'000});
6 changes: 6 additions & 0 deletions cpp/include/cudf/detail/distinct_hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,11 @@ struct distinct_hash_join {
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
inner_join(rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const;

/**
* @copydoc cudf::distinct_hash_join::left_join
*/
std::unique_ptr<rmm::device_uvector<size_type>> left_join(
rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const;
};
} // namespace cudf::detail
20 changes: 19 additions & 1 deletion cpp/include/cudf/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ class distinct_hash_join {
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* Returns the row indices that can be used to construct the result of performing
* @brief Returns the row indices that can be used to construct the result of performing
* an inner join between two tables. @see cudf::inner_join().
*
* @param stream CUDA stream used for device memory operations and kernel launches
Expand All @@ -500,6 +500,24 @@ class distinct_hash_join {
inner_join(rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const;

/**
* @brief Returns the build table indices that can be used to construct the result of performing
* a left join between two tables.
*
* @note For a given row index `i` of the probe table, the resulting `build_indices[i]` contains
* the row index of the matched row from the build table if there is a match. Otherwise, contains
* `JoinNoneValue`.
*
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned table and columns' device
* memory.
* @return A `build_indices` column that can be used to construct the result of performing a left
* join between two tables with `build` and `probe` as the join keys.
*/
std::unique_ptr<rmm::device_uvector<size_type>> left_join(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const;

private:
using impl_type = typename cudf::detail::distinct_hash_join<HasNested>; ///< Implementation type

Expand Down
80 changes: 73 additions & 7 deletions cpp/src/join/distinct_hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#include <cooperative_groups.h>
#include <cub/block/block_scan.cuh>
#include <cuco/static_set.cuh>
#include <thrust/fill.h>
#include <thrust/iterator/transform_output_iterator.h>
#include <thrust/sequence.h>

#include <cstddef>
#include <limits>
Expand Down Expand Up @@ -76,6 +79,18 @@ class build_keys_fn {
Hasher _hash;
};

/**
* @brief Device output transform functor to construct `size_type` with `cuco::pair<hash_value_type,
* lhs_index_type>`
*/
struct output_fn {
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, lhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
};

template <typename Tile>
__device__ void flush_buffer(Tile const& tile,
cudf::size_type tile_count,
Expand Down Expand Up @@ -306,9 +321,9 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr));
}

auto left_indices =
auto build_indices =
std::make_unique<rmm::device_uvector<size_type>>(probe_table_num_rows, stream, mr);
auto right_indices =
auto probe_indices =
std::make_unique<rmm::device_uvector<size_type>>(probe_table_num_rows, stream, mr);

auto const probe_row_hasher =
Expand All @@ -325,14 +340,50 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
probe_table_num_rows,
this->_hash_table.ref(cuco::find),
counter.data(),
left_indices->data(),
right_indices->data());
build_indices->data(),
probe_indices->data());

auto const actual_size = counter.value(stream);
left_indices->resize(actual_size, stream);
right_indices->resize(actual_size, stream);
build_indices->resize(actual_size, stream);
probe_indices->resize(actual_size, stream);

return {std::move(build_indices), std::move(probe_indices)};
}

template <cudf::has_nested HasNested>
std::unique_ptr<rmm::device_uvector<size_type>> distinct_hash_join<HasNested>::left_join(
rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const
{
cudf::thread_range range{"distinct_hash_join::left_join"};

size_type const probe_table_num_rows{this->_probe.num_rows()};

// If output size is zero, return empty
if (probe_table_num_rows == 0) {
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
}

auto build_indices =
std::make_unique<rmm::device_uvector<size_type>>(probe_table_num_rows, stream, mr);

// If build table is empty, return probe table
if (this->_build.num_rows() == 0) {
thrust::fill(
rmm::exec_policy_nosync(stream), build_indices->begin(), build_indices->end(), JoinNoneValue);
} else {
auto const probe_row_hasher =
cudf::experimental::row::hash::row_hasher{this->_preprocessed_probe};
auto const d_probe_hasher = probe_row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls});
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), rhs_index_type>{d_probe_hasher});

auto const output_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
// TODO conditional find for nulls once `cuco::static_set::find_if` is added
this->_hash_table.find_async(iter, iter + probe_table_num_rows, output_begin, stream.value());
}

return {std::move(left_indices), std::move(right_indices)};
return build_indices;
}
} // namespace detail

Expand Down Expand Up @@ -381,4 +432,19 @@ distinct_hash_join<cudf::has_nested::NO>::inner_join(rmm::cuda_stream_view strea
{
return _impl->inner_join(stream, mr);
}

template <>
std::unique_ptr<rmm::device_uvector<size_type>>
distinct_hash_join<cudf::has_nested::YES>::left_join(rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const
{
return _impl->left_join(stream, mr);
}

template <>
std::unique_ptr<rmm::device_uvector<size_type>> distinct_hash_join<cudf::has_nested::NO>::left_join(
rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const
{
return _impl->left_join(stream, mr);
}
} // namespace cudf
Loading

0 comments on commit aabfd83

Please sign in to comment.