From 71c990955ab57dcb1aec0efad9630c91404b2a57 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 23 Feb 2024 11:16:40 -0800 Subject: [PATCH] Add distinct key inner join (#14990) Contributes to #14948 This PR adds a public `cudf::distinct_hash_join` class that provides a fast code path for joins with distinct keys. Only distinct inner join is tackled in the current PR. Authors: - Yunsong Wang (https://github.com/PointKernel) Approvers: - Jason Lowe (https://github.com/jlowe) - Bradley Dice (https://github.com/bdice) - Lawrence Mitchell (https://github.com/wence-) - David Wendt (https://github.com/davidwendt) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/14990 --- cpp/CMakeLists.txt | 1 + cpp/benchmarks/CMakeLists.txt | 2 +- cpp/benchmarks/join/distinct_join.cu | 77 ++++ cpp/include/cudf/detail/cuco_helpers.hpp | 3 + .../cudf/detail/distinct_hash_join.cuh | 153 +++++++ cpp/include/cudf/join.hpp | 70 +++- cpp/src/join/distinct_hash_join.cu | 387 ++++++++++++++++++ cpp/tests/CMakeLists.txt | 2 +- cpp/tests/join/distinct_join_tests.cpp | 307 ++++++++++++++ 9 files changed, 999 insertions(+), 3 deletions(-) create mode 100644 cpp/benchmarks/join/distinct_join.cu create mode 100644 cpp/include/cudf/detail/distinct_hash_join.cuh create mode 100644 cpp/src/join/distinct_hash_join.cu create mode 100644 cpp/tests/join/distinct_join_tests.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index b87582b53c9..5fd6cd3544a 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -434,6 +434,7 @@ add_library( src/jit/util.cpp src/join/conditional_join.cu src/join/cross_join.cu + src/join/distinct_hash_join.cu src/join/hash_join.cu src/join/join.cu src/join/join_utils.cu diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 5a014537de0..ef25278877e 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -164,7 +164,7 @@ ConfigureNVBench( # ################################################################################################## # * join benchmark -------------------------------------------------------------------------------- ConfigureBench(JOIN_BENCH join/left_join.cu join/conditional_join.cu) -ConfigureNVBench(JOIN_NVBENCH join/join.cu join/mixed_join.cu) +ConfigureNVBench(JOIN_NVBENCH join/join.cu join/mixed_join.cu join/distinct_join.cu) # ################################################################################################## # * iterator benchmark ---------------------------------------------------------------------------- diff --git a/cpp/benchmarks/join/distinct_join.cu b/cpp/benchmarks/join/distinct_join.cu new file mode 100644 index 00000000000..cbdb82275ef --- /dev/null +++ b/cpp/benchmarks/join/distinct_join.cu @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "join_common.hpp" + +template +void distinct_inner_join(nvbench::state& state, + nvbench::type_list>) +{ + skip_helper(state); + + auto join = [](cudf::table_view const& left_input, + cudf::table_view const& right_input, + cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream) { + auto const has_nulls = cudf::has_nested_nulls(left_input) || cudf::has_nested_nulls(right_input) + ? cudf::nullable_join::YES + : cudf::nullable_join::NO; + auto hj_obj = cudf::distinct_hash_join{ + left_input, right_input, has_nulls, compare_nulls, stream}; + return hj_obj.inner_join(stream); + }; + + BM_join(state, join); +} + +// inner join ----------------------------------------------------------------------- +NVBENCH_BENCH_TYPES(distinct_inner_join, + NVBENCH_TYPE_AXES(nvbench::type_list, + nvbench::type_list, + nvbench::enum_type_list)) + .set_name("distinct_inner_join_32bit") + .set_type_axes_names({"Key Type", "Payload Type", "Nullable"}) + .add_int64_axis("Build Table Size", {100'000, 10'000'000, 80'000'000, 100'000'000}) + .add_int64_axis("Probe Table Size", + {100'000, 400'000, 10'000'000, 40'000'000, 100'000'000, 240'000'000}); + +NVBENCH_BENCH_TYPES(distinct_inner_join, + NVBENCH_TYPE_AXES(nvbench::type_list, + nvbench::type_list, + nvbench::enum_type_list)) + .set_name("distinct_inner_join_64bit") + .set_type_axes_names({"Key Type", "Payload Type", "Nullable"}) + .add_int64_axis("Build Table Size", {40'000'000, 50'000'000}) + .add_int64_axis("Probe Table Size", {50'000'000, 120'000'000}); + +NVBENCH_BENCH_TYPES(distinct_inner_join, + NVBENCH_TYPE_AXES(nvbench::type_list, + nvbench::type_list, + nvbench::enum_type_list)) + .set_name("distinct_inner_join_32bit_nulls") + .set_type_axes_names({"Key Type", "Payload Type", "Nullable"}) + .add_int64_axis("Build Table Size", {100'000, 10'000'000, 80'000'000, 100'000'000}) + .add_int64_axis("Probe Table Size", + {100'000, 400'000, 10'000'000, 40'000'000, 100'000'000, 240'000'000}); + +NVBENCH_BENCH_TYPES(distinct_inner_join, + NVBENCH_TYPE_AXES(nvbench::type_list, + nvbench::type_list, + nvbench::enum_type_list)) + .set_name("distinct_inner_join_64bit_nulls") + .set_type_axes_names({"Key Type", "Payload Type", "Nullable"}) + .add_int64_axis("Build Table Size", {40'000'000, 50'000'000}) + .add_int64_axis("Probe Table Size", {50'000'000, 120'000'000}); diff --git a/cpp/include/cudf/detail/cuco_helpers.hpp b/cpp/include/cudf/detail/cuco_helpers.hpp index 5f3c31479de..506f6475637 100644 --- a/cpp/include/cudf/detail/cuco_helpers.hpp +++ b/cpp/include/cudf/detail/cuco_helpers.hpp @@ -21,6 +21,9 @@ namespace cudf::detail { +/// Default load factor for cuco data structures +static double constexpr CUCO_DESIRED_LOAD_FACTOR = 0.5; + /** * @brief Stream-ordered allocator adaptor used for cuco data structures * diff --git a/cpp/include/cudf/detail/distinct_hash_join.cuh b/cpp/include/cudf/detail/distinct_hash_join.cuh new file mode 100644 index 00000000000..7827f861bd8 --- /dev/null +++ b/cpp/include/cudf/detail/distinct_hash_join.cuh @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include + +namespace cudf::detail { + +using cudf::experimental::row::lhs_index_type; +using cudf::experimental::row::rhs_index_type; + +/** + * @brief An comparator adapter wrapping both self comparator and two table comparator + */ +template +struct comparator_adapter { + comparator_adapter(Equal const& d_equal) : _d_equal{d_equal} {} + + __device__ constexpr auto operator()( + cuco::pair const&, + cuco::pair const&) const noexcept + { + // All build table keys are distinct thus `false` no matter what + return false; + } + + __device__ constexpr auto operator()( + cuco::pair const& lhs, + cuco::pair const& rhs) const noexcept + { + if (lhs.first != rhs.first) { return false; } + return _d_equal(lhs.second, rhs.second); + } + + private: + Equal _d_equal; +}; + +template +struct hasher_adapter { + hasher_adapter(Hasher const& d_hasher = {}) : _d_hasher{d_hasher} {} + + template + __device__ constexpr auto operator()(cuco::pair const& key) const noexcept + { + return _d_hasher(key.first); + } + + private: + Hasher _d_hasher; +}; + +/** + * @brief Distinct hash join that builds hash table in creation and probes results in subsequent + * `*_join` member functions. + * + * @tparam HasNested Flag indicating whether there are nested columns in build/probe table + */ +template +struct distinct_hash_join { + private: + /// Row equality type for nested columns + using nested_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter< + cudf::experimental::row::equality::device_row_comparator>; + /// Row equality type for flat columns + using flat_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter< + cudf::experimental::row::equality::device_row_comparator>; + + /// Device row equal type + using d_equal_type = + std::conditional_t; + using hasher = hasher_adapter>; + using probing_scheme_type = cuco::linear_probing<1, hasher>; + using cuco_storage_type = cuco::storage<1>; + + /// Hash table type + using hash_table_type = cuco::static_set, + cuco::extent, + cuda::thread_scope_device, + comparator_adapter, + probing_scheme_type, + cudf::detail::cuco_allocator, + cuco_storage_type>; + + bool _has_nulls; ///< true if nulls are present in either build table or probe table + cudf::null_equality _nulls_equal; ///< whether to consider nulls as equal + cudf::table_view _build; ///< input table to build the hash map + cudf::table_view _probe; ///< input table to probe the hash map + std::shared_ptr + _preprocessed_build; ///< input table preprocssed for row operators + std::shared_ptr + _preprocessed_probe; ///< input table preprocssed for row operators + hash_table_type _hash_table; ///< hash table built on `_build` + + public: + distinct_hash_join() = delete; + ~distinct_hash_join() = default; + distinct_hash_join(distinct_hash_join const&) = delete; + distinct_hash_join(distinct_hash_join&&) = delete; + distinct_hash_join& operator=(distinct_hash_join const&) = delete; + distinct_hash_join& operator=(distinct_hash_join&&) = delete; + + /** + * @brief Constructor that internally builds the hash table based on the given `build` table. + * + * @throw cudf::logic_error if the number of columns in `build` table is 0. + * + * @param build The build table, from which the hash table is built + * @param probe The probe table + * @param has_nulls Flag to indicate if any nulls exist in the `build` table or + * any `probe` table that will be used later for join. + * @param compare_nulls Controls whether null join-key values should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches. + */ + distinct_hash_join(cudf::table_view const& build, + cudf::table_view const& probe, + bool has_nulls, + cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream); + + /** + * @copydoc cudf::distinct_hash_join::inner_join + */ + std::pair>, + std::unique_ptr>> + inner_join(rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const; +}; +} // namespace cudf::detail diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 6c50e1d5998..d97dc64ac39 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,13 @@ namespace cudf { +/** + * @brief Enum to indicate whether the distinct join table has nested columns or not + * + * @ingroup column_join + */ +enum class has_nested : bool { YES, NO }; + // forward declaration namespace hashing::detail { template @@ -41,6 +48,9 @@ class MurmurHash3_x86_32; namespace detail { template class hash_join; + +template +class distinct_hash_join; } // namespace detail /** @@ -438,6 +448,64 @@ class hash_join { const std::unique_ptr _impl; }; +/** + * @brief Distinct hash join that builds hash table in creation and probes results in subsequent + * `*_join` member functions + * + * @note Behavior is undefined if the build table contains duplicates. + * @note All NaNs are considered as equal + * + * @tparam HasNested Flag indicating whether there are nested columns in build/probe table + */ +// TODO: `HasNested` to be removed via dispatching +template +class distinct_hash_join { + public: + distinct_hash_join() = delete; + ~distinct_hash_join(); + distinct_hash_join(distinct_hash_join const&) = delete; + distinct_hash_join(distinct_hash_join&&) = delete; + distinct_hash_join& operator=(distinct_hash_join const&) = delete; + distinct_hash_join& operator=(distinct_hash_join&&) = delete; + + /** + * @brief Constructs a distinct hash join object for subsequent probe calls + * + * @param build The build table that contains distinct elements + * @param probe The probe table, from which the keys are probed + * @param has_nulls Flag to indicate if there exists any nulls in the `build` table or + * any `probe` table that will be used later for join + * @param compare_nulls Controls whether null join-key values should match or not + * @param stream CUDA stream used for device memory operations and kernel launches + */ + distinct_hash_join(cudf::table_view const& build, + cudf::table_view const& probe, + nullable_join has_nulls = nullable_join::YES, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream()); + + /** + * Returns the row indices that can be used to construct the result of performing + * an inner join between two tables. @see cudf::inner_join(). + * + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned indices' device memory. + * + * @return A pair of columns [`build_indices`, `probe_indices`] that can be used to construct + * the result of performing an inner join between two tables with `build` and `probe` + * as the join keys. + */ + std::pair>, + std::unique_ptr>> + inner_join(rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; + + private: + using impl_type = typename cudf::detail::distinct_hash_join; ///< Implementation type + + std::unique_ptr _impl; ///< Distinct hash join implementation +}; + /** * @brief Returns a pair of row index vectors corresponding to all pairs * of rows between the specified tables where the predicate evaluates to true. diff --git a/cpp/src/join/distinct_hash_join.cu b/cpp/src/join/distinct_hash_join.cu new file mode 100644 index 00000000000..7c834d1a96b --- /dev/null +++ b/cpp/src/join/distinct_hash_join.cu @@ -0,0 +1,387 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "join_common_utils.cuh" +#include "join_common_utils.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace cudf { +namespace detail { +namespace { + +static auto constexpr DISTINCT_JOIN_BLOCK_SIZE = 256; + +template +auto prepare_device_equal( + std::shared_ptr build, + std::shared_ptr probe, + bool has_nulls, + cudf::null_equality compare_nulls) +{ + auto const two_table_equal = + cudf::experimental::row::equality::two_table_comparator(build, probe); + return comparator_adapter{two_table_equal.equal_to( + nullate::DYNAMIC{has_nulls}, compare_nulls)}; +} + +/** + * @brief Device functor to create a pair of {hash_value, row_index} for a given row. + * + * @tparam Hasher The type of internal hasher to compute row hash. + */ +template +class build_keys_fn { + public: + CUDF_HOST_DEVICE build_keys_fn(Hasher const& hash) : _hash{hash} {} + + __device__ __forceinline__ auto operator()(size_type i) const noexcept + { + return cuco::pair{_hash(i), T{i}}; + } + + private: + Hasher _hash; +}; + +template +__device__ void flush_buffer(Tile const& tile, + cudf::size_type tile_count, + cuco::pair* buffer, + cudf::size_type* counter, + cudf::size_type* build_indices, + cudf::size_type* probe_indices) +{ + cudf::size_type offset; + auto const lane_id = tile.thread_rank(); + if (0 == lane_id) { offset = atomicAdd(counter, tile_count); } + offset = tile.shfl(offset, 0); + + for (cudf::size_type i = lane_id; i < tile_count; i += tile.size()) { + auto const& [build_idx, probe_idx] = buffer[i]; + *(build_indices + offset + i) = build_idx; + *(probe_indices + offset + i) = probe_idx; + } +} + +__device__ void flush_buffer(cooperative_groups::thread_block const& block, + cudf::size_type buffer_size, + cuco::pair* buffer, + cudf::size_type* counter, + cudf::size_type* build_indices, + cudf::size_type* probe_indices) +{ + auto i = block.thread_rank(); + __shared__ cudf::size_type offset; + + if (i == 0) { offset = atomicAdd(counter, buffer_size); } + block.sync(); + + while (i < buffer_size) { + auto const& [build_idx, probe_idx] = buffer[i]; + *(build_indices + offset + i) = build_idx; + *(probe_indices + offset + i) = probe_idx; + + i += block.size(); + } +} + +// TODO: custom kernel to be replaced by cuco::static_set::retrieve +template +CUDF_KERNEL void distinct_join_probe_kernel(Iter iter, + cudf::size_type n, + HashTable hash_table, + cudf::size_type* counter, + cudf::size_type* build_indices, + cudf::size_type* probe_indices) +{ + namespace cg = cooperative_groups; + + auto constexpr tile_size = HashTable::cg_size; + auto constexpr window_size = HashTable::window_size; + + auto idx = cudf::detail::grid_1d::global_thread_id() / tile_size; + auto const stride = cudf::detail::grid_1d::grid_stride() / tile_size; + auto const block = cg::this_thread_block(); + + // CG-based probing algorithm + if constexpr (tile_size != 1) { + auto const tile = cg::tiled_partition(block); + + auto constexpr flushing_tile_size = cudf::detail::warp_size / window_size; + // random choice to tune + auto constexpr flushing_buffer_size = 2 * flushing_tile_size; + auto constexpr num_flushing_tiles = DISTINCT_JOIN_BLOCK_SIZE / flushing_tile_size; + auto constexpr max_matches = flushing_tile_size / tile_size; + + auto const flushing_tile = cg::tiled_partition(block); + auto const flushing_tile_id = block.thread_rank() / flushing_tile_size; + + __shared__ cuco::pair + flushing_tile_buffer[num_flushing_tiles][flushing_tile_size]; + // per flushing-tile counter to track number of filled elements + __shared__ cudf::size_type flushing_counter[num_flushing_tiles]; + + if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; } + flushing_tile.sync(); // sync still needed since cg.any doesn't imply a memory barrier + + while (flushing_tile.any(idx < n)) { + bool active_flag = idx < n; + auto const active_flushing_tile = + cg::binary_partition(flushing_tile, active_flag); + if (active_flag) { + auto const found = hash_table.find(tile, *(iter + idx)); + if (tile.thread_rank() == 0 and found != hash_table.end()) { + auto const offset = atomicAdd_block(&flushing_counter[flushing_tile_id], 1); + flushing_tile_buffer[flushing_tile_id][offset] = cuco::pair{ + static_cast(found->second), static_cast(idx)}; + } + } + + flushing_tile.sync(); + if (flushing_counter[flushing_tile_id] + max_matches > flushing_buffer_size) { + flush_buffer(flushing_tile, + flushing_counter[flushing_tile_id], + flushing_tile_buffer[flushing_tile_id], + counter, + build_indices, + probe_indices); + flushing_tile.sync(); + if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; } + flushing_tile.sync(); + } + + idx += stride; + } // while + + if (flushing_counter[flushing_tile_id] > 0) { + flush_buffer(flushing_tile, + flushing_counter[flushing_tile_id], + flushing_tile_buffer[flushing_tile_id], + counter, + build_indices, + probe_indices); + } + } + // Scalar probing for CG size 1 + else { + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage block_scan_temp_storage; + + auto constexpr buffer_capacity = 2 * DISTINCT_JOIN_BLOCK_SIZE; + __shared__ cuco::pair buffer[buffer_capacity]; + cudf::size_type buffer_size = 0; + + while (idx - block.thread_rank() < n) { // the whole thread block falls into the same iteration + cudf::size_type thread_count{0}; + cudf::size_type build_idx{0}; + if (idx < n) { + auto const found = hash_table.find(*(iter + idx)); + thread_count = found != hash_table.end(); + build_idx = static_cast(found->second); + } + + // Use a whole-block scan to calculate the output location + cudf::size_type offset; + cudf::size_type block_count; + block_scan(block_scan_temp_storage).ExclusiveSum(thread_count, offset, block_count); + + if (buffer_size + block_count > buffer_capacity) { + flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices); + block.sync(); + buffer_size = 0; + } + + if (thread_count == 1) { + buffer[buffer_size + offset] = cuco::pair{build_idx, static_cast(idx)}; + } + buffer_size += block_count; + block.sync(); + + idx += stride; + } // while + + if (buffer_size > 0) { + flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices); + } + } +} +} // namespace + +template +distinct_hash_join::distinct_hash_join(cudf::table_view const& build, + cudf::table_view const& probe, + bool has_nulls, + cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream) + : _has_nulls{has_nulls}, + _nulls_equal{compare_nulls}, + _build{build}, + _probe{probe}, + _preprocessed_build{ + cudf::experimental::row::equality::preprocessed_table::create(_build, stream)}, + _preprocessed_probe{ + cudf::experimental::row::equality::preprocessed_table::create(_probe, stream)}, + _hash_table{build.num_rows(), + CUCO_DESIRED_LOAD_FACTOR, + cuco::empty_key{cuco::pair{std::numeric_limits::max(), + lhs_index_type{JoinNoneValue}}}, + prepare_device_equal( + _preprocessed_build, _preprocessed_probe, has_nulls, compare_nulls), + {}, + cuco::thread_scope_device, + cuco_storage_type{}, + cudf::detail::cuco_allocator{stream}, + stream.value()} +{ + CUDF_FUNC_RANGE(); + CUDF_EXPECTS(0 != this->_build.num_columns(), "Hash join build table is empty"); + + if (this->_build.num_rows() == 0) { return; } + + auto const row_hasher = experimental::row::hash::row_hasher{this->_preprocessed_build}; + auto const d_hasher = row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls}); + + auto const iter = cudf::detail::make_counting_transform_iterator( + 0, build_keys_fn{d_hasher}); + + size_type const build_table_num_rows{build.num_rows()}; + if (this->_nulls_equal == cudf::null_equality::EQUAL or (not cudf::nullable(this->_build))) { + this->_hash_table.insert_async(iter, iter + build_table_num_rows, stream.value()); + } else { + auto stencil = thrust::counting_iterator{0}; + auto const row_bitmask = + cudf::detail::bitmask_and(this->_build, stream, rmm::mr::get_current_device_resource()).first; + auto const pred = + cudf::detail::row_is_valid{reinterpret_cast(row_bitmask.data())}; + + // insert valid rows + this->_hash_table.insert_if_async( + iter, iter + build_table_num_rows, stencil, pred, stream.value()); + } +} + +template +std::pair>, + std::unique_ptr>> +distinct_hash_join::inner_join(rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) const +{ + cudf::thread_range range{"distinct_hash_join::inner_join"}; + + size_type const probe_table_num_rows{this->_probe.num_rows()}; + + // If output size is zero, return immediately + if (probe_table_num_rows == 0) { + return std::pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); + } + + auto left_indices = + std::make_unique>(probe_table_num_rows, stream, mr); + auto right_indices = + std::make_unique>(probe_table_num_rows, stream, mr); + + auto const probe_row_hasher = + cudf::experimental::row::hash::row_hasher{this->_preprocessed_probe}; + auto const d_probe_hasher = probe_row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls}); + auto const iter = cudf::detail::make_counting_transform_iterator( + 0, build_keys_fn{d_probe_hasher}); + auto counter = rmm::device_scalar{stream}; + counter.set_value_to_zero_async(stream); + + cudf::detail::grid_1d grid{probe_table_num_rows, DISTINCT_JOIN_BLOCK_SIZE}; + distinct_join_probe_kernel<<>>( + iter, + probe_table_num_rows, + this->_hash_table.ref(cuco::find), + counter.data(), + left_indices->data(), + right_indices->data()); + + auto const actual_size = counter.value(stream); + left_indices->resize(actual_size, stream); + right_indices->resize(actual_size, stream); + + return {std::move(left_indices), std::move(right_indices)}; +} +} // namespace detail + +template <> +distinct_hash_join::~distinct_hash_join() = default; + +template <> +distinct_hash_join::~distinct_hash_join() = default; + +template <> +distinct_hash_join::distinct_hash_join(cudf::table_view const& build, + cudf::table_view const& probe, + nullable_join has_nulls, + null_equality compare_nulls, + rmm::cuda_stream_view stream) + : _impl{std::make_unique( + build, probe, has_nulls == nullable_join::YES, compare_nulls, stream)} +{ +} + +template <> +distinct_hash_join::distinct_hash_join(cudf::table_view const& build, + cudf::table_view const& probe, + nullable_join has_nulls, + null_equality compare_nulls, + rmm::cuda_stream_view stream) + : _impl{std::make_unique( + build, probe, has_nulls == nullable_join::YES, compare_nulls, stream)} +{ +} + +template <> +std::pair>, + std::unique_ptr>> +distinct_hash_join::inner_join(rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) const +{ + return _impl->inner_join(stream, mr); +} + +template <> +std::pair>, + std::unique_ptr>> +distinct_hash_join::inner_join(rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) const +{ + return _impl->inner_join(stream, mr); +} +} // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 94ae349896c..3e377b07eee 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -152,7 +152,7 @@ ConfigureTest( # * join tests ------------------------------------------------------------------------------------ ConfigureTest( JOIN_TEST join/join_tests.cpp join/conditional_join_tests.cu join/cross_join_tests.cpp - join/semi_anti_join_tests.cpp join/mixed_join_tests.cu + join/semi_anti_join_tests.cpp join/mixed_join_tests.cu join/distinct_join_tests.cpp ) # ################################################################################################## diff --git a/cpp/tests/join/distinct_join_tests.cpp b/cpp/tests/join/distinct_join_tests.cpp new file mode 100644 index 00000000000..27f4c4fdf61 --- /dev/null +++ b/cpp/tests/join/distinct_join_tests.cpp @@ -0,0 +1,307 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +template +using column_wrapper = cudf::test::fixed_width_column_wrapper; +using strcol_wrapper = cudf::test::strings_column_wrapper; +using CVector = std::vector>; +using Table = cudf::table; + +struct DistinctJoinTest : public cudf::test::BaseFixture { + void compare_to_reference( + cudf::table_view const& build_table, + cudf::table_view const& probe_table, + std::pair>, + std::unique_ptr>> const& result, + cudf::table_view const& expected_table) + { + auto const& [build_join_indices, probe_join_indices] = result; + + auto build_indices_span = cudf::device_span{*build_join_indices}; + auto probe_indices_span = cudf::device_span{*probe_join_indices}; + + auto build_indices_col = cudf::column_view{build_indices_span}; + auto probe_indices_col = cudf::column_view{probe_indices_span}; + + auto constexpr oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; + auto joined_cols = cudf::gather(build_table, build_indices_col, oob_policy)->release(); + auto right_cols = cudf::gather(probe_table, probe_indices_col, oob_policy)->release(); + + joined_cols.insert(joined_cols.end(), + std::make_move_iterator(right_cols.begin()), + std::make_move_iterator(right_cols.end())); + auto joined_table = std::make_unique(std::move(joined_cols)); + auto result_sort_order = cudf::sorted_order(joined_table->view()); + auto sorted_joined_table = cudf::gather(joined_table->view(), *result_sort_order); + + auto expected_sort_order = cudf::sorted_order(expected_table); + auto sorted_expected = cudf::gather(expected_table, *expected_sort_order); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_expected, *sorted_joined_table); + } +}; + +TEST_F(DistinctJoinTest, IntegerInnerJoin) +{ + auto constexpr size = 2024; + + auto const init = cudf::numeric_scalar{0}; + + auto build = cudf::sequence(size, init, cudf::numeric_scalar{1}); + auto probe = cudf::sequence(size, init, cudf::numeric_scalar{2}); + + auto build_table = cudf::table_view{{build->view()}}; + auto probe_table = cudf::table_view{{probe->view()}}; + + auto distinct_join = cudf::distinct_hash_join{ + build_table, probe_table, cudf::nullable_join::NO}; + + auto result = distinct_join.inner_join(); + + auto constexpr gold_size = size / 2; + auto gold = cudf::sequence(gold_size, init, cudf::numeric_scalar{2}); + this->compare_to_reference(build_table, probe_table, result, cudf::table_view{{gold->view()}}); +} + +TEST_F(DistinctJoinTest, InnerJoinNoNulls) +{ + column_wrapper col0_0{{1, 2, 3, 4, 5}}; + strcol_wrapper col0_1({"s0", "s0", "s3", "s4", "s5"}); + column_wrapper col0_2{{9, 9, 9, 9, 9}}; + + column_wrapper col1_0{{1, 2, 3, 4, 9}}; + strcol_wrapper col1_1({"s0", "s0", "s0", "s4", "s4"}); + column_wrapper col1_2{{9, 9, 9, 0, 9}}; + + CVector cols0, cols1; + cols0.push_back(col0_0.release()); + cols0.push_back(col0_1.release()); + cols0.push_back(col0_2.release()); + cols1.push_back(col1_0.release()); + cols1.push_back(col1_1.release()); + cols1.push_back(col1_2.release()); + + Table build(std::move(cols0)); + Table probe(std::move(cols1)); + + auto distinct_join = cudf::distinct_hash_join{build.view(), probe.view()}; + auto result = distinct_join.inner_join(); + + column_wrapper col_gold_0{{1, 2}}; + strcol_wrapper col_gold_1({"s0", "s0"}); + column_wrapper col_gold_2{{9, 9}}; + column_wrapper col_gold_3{{1, 2}}; + strcol_wrapper col_gold_4({"s0", "s0"}); + column_wrapper col_gold_5{{9, 9}}; + CVector cols_gold; + cols_gold.push_back(col_gold_0.release()); + cols_gold.push_back(col_gold_1.release()); + cols_gold.push_back(col_gold_2.release()); + cols_gold.push_back(col_gold_3.release()); + cols_gold.push_back(col_gold_4.release()); + cols_gold.push_back(col_gold_5.release()); + Table gold(std::move(cols_gold)); + + this->compare_to_reference(build.view(), probe.view(), result, gold.view()); +} + +TEST_F(DistinctJoinTest, InnerJoinWithNulls) +{ + column_wrapper col0_0{{3, 1, 2, 0, 2}}; + strcol_wrapper col0_1({"s1", "s1", "s0", "s4", "s0"}, {1, 1, 0, 1, 1}); + column_wrapper col0_2{{1, 1, 2, 4, 1}}; + + column_wrapper col1_0{{1, 2, 0, 2, 3}}; + strcol_wrapper col1_1({"s1", "s0", "s1", "s0", "s1"}); + column_wrapper col1_2{{1, 1, 1, 1, 1}, {0, 1, 1, 0, 1}}; + + CVector cols0, cols1; + cols0.push_back(col0_0.release()); + cols0.push_back(col0_1.release()); + cols0.push_back(col0_2.release()); + cols1.push_back(col1_0.release()); + cols1.push_back(col1_1.release()); + cols1.push_back(col1_2.release()); + + Table build(std::move(cols0)); + Table probe(std::move(cols1)); + + auto distinct_join = cudf::distinct_hash_join{build.view(), probe.view()}; + auto result = distinct_join.inner_join(); + + column_wrapper col_gold_0{{3, 2}}; + strcol_wrapper col_gold_1({"s1", "s0"}, {1, 1}); + column_wrapper col_gold_2{{1, 1}}; + column_wrapper col_gold_3{{3, 2}}; + strcol_wrapper col_gold_4({"s1", "s0"}, {1, 1}); + column_wrapper col_gold_5{{1, 1}}; + CVector cols_gold; + cols_gold.push_back(col_gold_0.release()); + cols_gold.push_back(col_gold_1.release()); + cols_gold.push_back(col_gold_2.release()); + cols_gold.push_back(col_gold_3.release()); + cols_gold.push_back(col_gold_4.release()); + cols_gold.push_back(col_gold_5.release()); + Table gold(std::move(cols_gold)); + + this->compare_to_reference(build.view(), probe.view(), result, gold.view()); +} + +TEST_F(DistinctJoinTest, InnerJoinWithStructsAndNulls) +{ + column_wrapper col0_0{{3, 1, 2, 0, 2}}; + strcol_wrapper col0_1({"s1", "s1", "s0", "s4", "s0"}, {1, 1, 0, 1, 1}); + column_wrapper col0_2{{0, 1, 2, 4, 4}, {1, 1, 1, 1, 0}}; + std::initializer_list col0_names = { + "Samuel Vimes", "Carrot Ironfoundersson", "Detritus", "Samuel Vimes", "Angua von Überwald"}; + auto col0_names_col = strcol_wrapper{col0_names.begin(), col0_names.end()}; + auto col0_ages_col = column_wrapper{{48, 27, 351, 31, 25}}; + + auto col0_is_human_col = column_wrapper{{true, true, false, false, false}, {1, 1, 0, 1, 0}}; + + auto col0_3 = + cudf::test::structs_column_wrapper{{col0_names_col, col0_ages_col, col0_is_human_col}}; + + column_wrapper col1_0{{2, 2, 0, 4, 3}}; + strcol_wrapper col1_1({"s1", "s0", "s1", "s2", "s1"}); + column_wrapper col1_2{{1, 1, 1, 2, 0}, {1, 0, 1, 1, 1}}; + std::initializer_list col1_names = {"Carrot Ironfoundersson", + "Angua von Überwald", + "Detritus", + "Carrot Ironfoundersson", + "Samuel Vimes"}; + auto col1_names_col = strcol_wrapper{col1_names.begin(), col1_names.end()}; + auto col1_ages_col = column_wrapper{{31, 25, 351, 27, 48}}; + + auto col1_is_human_col = column_wrapper{{true, false, false, false, true}, {1, 0, 0, 1, 1}}; + + auto col1_3 = + cudf::test::structs_column_wrapper{{col1_names_col, col1_ages_col, col1_is_human_col}}; + + CVector cols0, cols1; + cols0.push_back(col0_0.release()); + cols0.push_back(col0_1.release()); + cols0.push_back(col0_2.release()); + cols0.push_back(col0_3.release()); + cols1.push_back(col1_0.release()); + cols1.push_back(col1_1.release()); + cols1.push_back(col1_2.release()); + cols1.push_back(col1_3.release()); + + Table probe(std::move(cols0)); + Table build(std::move(cols1)); + + auto distinct_join = cudf::distinct_hash_join{build.view(), probe.view()}; + auto result = distinct_join.inner_join(); + + column_wrapper col_gold_0{{3, 2}}; + strcol_wrapper col_gold_1({"s1", "s0"}, {1, 1}); + column_wrapper col_gold_2{{0, 4}, {1, 0}}; + auto col_gold_3_names_col = strcol_wrapper{"Samuel Vimes", "Angua von Überwald"}; + auto col_gold_3_ages_col = column_wrapper{{48, 25}}; + + auto col_gold_3_is_human_col = column_wrapper{{true, false}, {1, 0}}; + + auto col_gold_3 = cudf::test::structs_column_wrapper{ + {col_gold_3_names_col, col_gold_3_ages_col, col_gold_3_is_human_col}}; + + column_wrapper col_gold_4{{3, 2}}; + strcol_wrapper col_gold_5({"s1", "s0"}, {1, 1}); + column_wrapper col_gold_6{{0, -1}, {1, 0}}; + auto col_gold_7_names_col = strcol_wrapper{"Samuel Vimes", "Angua von Überwald"}; + auto col_gold_7_ages_col = column_wrapper{{48, 25}}; + + auto col_gold_7_is_human_col = column_wrapper{{true, false}, {1, 0}}; + + auto col_gold_7 = cudf::test::structs_column_wrapper{ + {col_gold_7_names_col, col_gold_7_ages_col, col_gold_7_is_human_col}}; + CVector cols_gold; + cols_gold.push_back(col_gold_0.release()); + cols_gold.push_back(col_gold_1.release()); + cols_gold.push_back(col_gold_2.release()); + cols_gold.push_back(col_gold_3.release()); + cols_gold.push_back(col_gold_4.release()); + cols_gold.push_back(col_gold_5.release()); + cols_gold.push_back(col_gold_6.release()); + cols_gold.push_back(col_gold_7.release()); + Table gold(std::move(cols_gold)); + + this->compare_to_reference(build.view(), probe.view(), result, gold.view()); +} + +TEST_F(DistinctJoinTest, EmptyBuildTableInnerJoin) +{ + column_wrapper col0_0; + column_wrapper col0_1; + + column_wrapper col1_0{{2, 2, 0, 4, 3}}; + column_wrapper col1_1{{1, 0, 1, 2, 1}, {1, 0, 1, 1, 1}}; + + CVector cols0, cols1; + cols0.push_back(col0_0.release()); + cols0.push_back(col0_1.release()); + cols1.push_back(col1_0.release()); + cols1.push_back(col1_1.release()); + + Table build(std::move(cols0)); + Table probe(std::move(cols1)); + + auto distinct_join = cudf::distinct_hash_join{build.view(), probe.view()}; + auto result = distinct_join.inner_join(); + + this->compare_to_reference(build.view(), probe.view(), result, build.view()); +} + +TEST_F(DistinctJoinTest, EmptyProbeTableInnerJoin) +{ + column_wrapper col0_0{{2, 2, 0, 4, 3}}; + column_wrapper col0_1{{1, 0, 1, 2, 1}, {1, 0, 1, 1, 1}}; + + column_wrapper col1_0; + column_wrapper col1_1; + + CVector cols0, cols1; + cols0.push_back(col0_0.release()); + cols0.push_back(col0_1.release()); + cols1.push_back(col1_0.release()); + cols1.push_back(col1_1.release()); + + Table build(std::move(cols0)); + Table probe(std::move(cols1)); + + auto distinct_join = cudf::distinct_hash_join{build.view(), probe.view()}; + auto result = distinct_join.inner_join(); + + this->compare_to_reference(build.view(), probe.view(), result, probe.view()); +}