diff --git a/cpp/include/cudf/detail/distinct_hash_join.cuh b/cpp/include/cudf/detail/distinct_hash_join.cuh index 93d52d5dda3..de3d23e9470 100644 --- a/cpp/include/cudf/detail/distinct_hash_join.cuh +++ b/cpp/include/cudf/detail/distinct_hash_join.cuh @@ -85,16 +85,10 @@ struct hasher_adapter { template struct distinct_hash_join { private: - /// Row equality type for nested columns - using nested_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter< - cudf::experimental::row::equality::device_row_comparator>; - /// Row equality type for flat columns - using flat_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter< - cudf::experimental::row::equality::device_row_comparator>; - /// Device row equal type - using d_equal_type = - std::conditional_t; + using d_equal_type = cudf::experimental::row::equality::strong_index_comparator_adapter< + cudf::experimental::row::equality::device_row_comparator>; using hasher = hasher_adapter>; using probing_scheme_type = cuco::linear_probing<1, hasher>; using cuco_storage_type = cuco::storage<1>; diff --git a/cpp/src/join/distinct_hash_join.cu b/cpp/src/join/distinct_hash_join.cu index a3652942973..ad401bdccba 100644 --- a/cpp/src/join/distinct_hash_join.cu +++ b/cpp/src/join/distinct_hash_join.cu @@ -46,8 +46,6 @@ namespace cudf { namespace detail { namespace { -static auto constexpr DISTINCT_JOIN_BLOCK_SIZE = 256; - template auto prepare_device_equal( std::shared_ptr build, @@ -82,7 +80,7 @@ class build_keys_fn { /** * @brief Device output transform functor to construct `size_type` with `cuco::pair` + * lhs_index_type>` or `cuco::pair` */ struct output_fn { __device__ constexpr cudf::size_type operator()( @@ -90,167 +88,12 @@ struct output_fn { { return static_cast(x.second); } -}; - -template -__device__ void flush_buffer(Tile const& tile, - cudf::size_type tile_count, - cuco::pair* buffer, - cudf::size_type* counter, - cudf::size_type* build_indices, - cudf::size_type* probe_indices) -{ - cudf::size_type offset; - auto const lane_id = tile.thread_rank(); - if (0 == lane_id) { offset = atomicAdd(counter, tile_count); } - offset = tile.shfl(offset, 0); - - for (cudf::size_type i = lane_id; i < tile_count; i += tile.size()) { - auto const& [build_idx, probe_idx] = buffer[i]; - *(build_indices + offset + i) = build_idx; - *(probe_indices + offset + i) = probe_idx; - } -} - -__device__ void flush_buffer(cooperative_groups::thread_block const& block, - cudf::size_type buffer_size, - cuco::pair* buffer, - cudf::size_type* counter, - cudf::size_type* build_indices, - cudf::size_type* probe_indices) -{ - auto i = block.thread_rank(); - __shared__ cudf::size_type offset; - - if (i == 0) { offset = atomicAdd(counter, buffer_size); } - block.sync(); - - while (i < buffer_size) { - auto const& [build_idx, probe_idx] = buffer[i]; - *(build_indices + offset + i) = build_idx; - *(probe_indices + offset + i) = probe_idx; - - i += block.size(); - } -} - -// TODO: custom kernel to be replaced by cuco::static_set::retrieve -template -CUDF_KERNEL void distinct_join_probe_kernel(Iter iter, - cudf::size_type n, - HashTable hash_table, - cudf::size_type* counter, - cudf::size_type* build_indices, - cudf::size_type* probe_indices) -{ - namespace cg = cooperative_groups; - - auto constexpr tile_size = HashTable::cg_size; - auto constexpr window_size = HashTable::window_size; - - auto idx = cudf::detail::grid_1d::global_thread_id() / tile_size; - auto const stride = cudf::detail::grid_1d::grid_stride() / tile_size; - auto const block = cg::this_thread_block(); - - // CG-based probing algorithm - if constexpr (tile_size != 1) { - auto const tile = cg::tiled_partition(block); - - auto constexpr flushing_tile_size = cudf::detail::warp_size / window_size; - // random choice to tune - auto constexpr flushing_buffer_size = 2 * flushing_tile_size; - auto constexpr num_flushing_tiles = DISTINCT_JOIN_BLOCK_SIZE / flushing_tile_size; - auto constexpr max_matches = flushing_tile_size / tile_size; - - auto const flushing_tile = cg::tiled_partition(block); - auto const flushing_tile_id = block.thread_rank() / flushing_tile_size; - - __shared__ cuco::pair - flushing_tile_buffer[num_flushing_tiles][flushing_tile_size]; - // per flushing-tile counter to track number of filled elements - __shared__ cudf::size_type flushing_counter[num_flushing_tiles]; - - if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; } - flushing_tile.sync(); // sync still needed since cg.any doesn't imply a memory barrier - - while (flushing_tile.any(idx < n)) { - bool active_flag = idx < n; - auto const active_flushing_tile = - cg::binary_partition(flushing_tile, active_flag); - if (active_flag) { - auto const found = hash_table.find(tile, *(iter + idx)); - if (tile.thread_rank() == 0 and found != hash_table.end()) { - auto const offset = atomicAdd_block(&flushing_counter[flushing_tile_id], 1); - flushing_tile_buffer[flushing_tile_id][offset] = cuco::pair{ - static_cast(found->second), static_cast(idx)}; - } - } - - flushing_tile.sync(); - if (flushing_counter[flushing_tile_id] + max_matches > flushing_buffer_size) { - flush_buffer(flushing_tile, - flushing_counter[flushing_tile_id], - flushing_tile_buffer[flushing_tile_id], - counter, - build_indices, - probe_indices); - flushing_tile.sync(); - if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; } - flushing_tile.sync(); - } - - idx += stride; - } // while - - if (flushing_counter[flushing_tile_id] > 0) { - flush_buffer(flushing_tile, - flushing_counter[flushing_tile_id], - flushing_tile_buffer[flushing_tile_id], - counter, - build_indices, - probe_indices); - } - } - // Scalar probing for CG size 1 - else { - using block_scan = cub::BlockScan; - __shared__ typename block_scan::TempStorage block_scan_temp_storage; - - auto constexpr buffer_capacity = 2 * DISTINCT_JOIN_BLOCK_SIZE; - __shared__ cuco::pair buffer[buffer_capacity]; - cudf::size_type buffer_size = 0; - - while (idx - block.thread_rank() < n) { // the whole thread block falls into the same iteration - auto const found = idx < n ? hash_table.find(*(iter + idx)) : hash_table.end(); - auto const has_match = found != hash_table.end(); - - // Use a whole-block scan to calculate the output location - cudf::size_type offset; - cudf::size_type block_count; - block_scan(block_scan_temp_storage) - .ExclusiveSum(static_cast(has_match), offset, block_count); - - if (buffer_size + block_count > buffer_capacity) { - flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices); - block.sync(); - buffer_size = 0; - } - - if (has_match) { - buffer[buffer_size + offset] = cuco::pair{static_cast(found->second), - static_cast(idx)}; - } - buffer_size += block_count; - block.sync(); - - idx += stride; - } // while - - if (buffer_size > 0) { - flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices); - } + __device__ constexpr cudf::size_type operator()( + cuco::pair const& x) const + { + return static_cast(x.second); } -} +}; } // namespace template @@ -332,19 +175,16 @@ distinct_hash_join::inner_join(rmm::cuda_stream_view stream, auto const d_probe_hasher = probe_row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls}); auto const iter = cudf::detail::make_counting_transform_iterator( 0, build_keys_fn{d_probe_hasher}); - auto counter = rmm::device_scalar{stream}; - counter.set_value_to_zero_async(stream); - - cudf::detail::grid_1d grid{probe_table_num_rows, DISTINCT_JOIN_BLOCK_SIZE}; - distinct_join_probe_kernel<<>>( - iter, - probe_table_num_rows, - this->_hash_table.ref(cuco::find), - counter.data(), - build_indices->data(), - probe_indices->data()); - - auto const actual_size = counter.value(stream); + + auto const build_indices_begin = + thrust::make_transform_output_iterator(build_indices->begin(), output_fn{}); + auto const probe_indices_begin = + thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{}); + + auto const [probe_indices_end, _] = this->_hash_table.retrieve( + iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, stream.value()); + + auto const actual_size = std::distance(probe_indices_begin, probe_indices_end); build_indices->resize(actual_size, stream); probe_indices->resize(actual_size, stream);