Skip to content

Commit

Permalink
Improve distinct join with set retrieve (#15636)
Browse files Browse the repository at this point in the history
This PR updates the distinct join to use `static_set::retrieve` instead of the custom device code.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - David Wendt (https://github.com/davidwendt)
  - Nghia Truong (https://github.com/ttnghia)
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Karthikeyan (https://github.com/karthikeyann)
  - Shruti Shivakumar (https://github.com/shrshi)

URL: #15636
  • Loading branch information
PointKernel authored May 6, 2024
1 parent 4ce6674 commit bc3071e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 185 deletions.
12 changes: 3 additions & 9 deletions cpp/include/cudf/detail/distinct_hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,10 @@ struct hasher_adapter {
template <cudf::has_nested HasNested>
struct distinct_hash_join {
private:
/// Row equality type for nested columns
using nested_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<true, cudf::nullate::DYNAMIC>>;
/// Row equality type for flat columns
using flat_row_equal = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<false, cudf::nullate::DYNAMIC>>;

/// Device row equal type
using d_equal_type =
std::conditional_t<HasNested == cudf::has_nested::YES, nested_row_equal, flat_row_equal>;
using d_equal_type = cudf::experimental::row::equality::strong_index_comparator_adapter<
cudf::experimental::row::equality::device_row_comparator<HasNested == cudf::has_nested::YES,
cudf::nullate::DYNAMIC>>;
using hasher = hasher_adapter<thrust::identity<hash_value_type>>;
using probing_scheme_type = cuco::linear_probing<1, hasher>;
using cuco_storage_type = cuco::storage<1>;
Expand Down
192 changes: 16 additions & 176 deletions cpp/src/join/distinct_hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ namespace cudf {
namespace detail {
namespace {

static auto constexpr DISTINCT_JOIN_BLOCK_SIZE = 256;

template <cudf::has_nested HasNested>
auto prepare_device_equal(
std::shared_ptr<cudf::experimental::row::equality::preprocessed_table> build,
Expand Down Expand Up @@ -82,175 +80,20 @@ class build_keys_fn {

/**
* @brief Device output transform functor to construct `size_type` with `cuco::pair<hash_value_type,
* lhs_index_type>`
* lhs_index_type>` or `cuco::pair<hash_value_type, rhs_index_type>`
*/
struct output_fn {
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, lhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
};

template <typename Tile>
__device__ void flush_buffer(Tile const& tile,
cudf::size_type tile_count,
cuco::pair<cudf::size_type, cudf::size_type>* buffer,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
cudf::size_type offset;
auto const lane_id = tile.thread_rank();
if (0 == lane_id) { offset = atomicAdd(counter, tile_count); }
offset = tile.shfl(offset, 0);

for (cudf::size_type i = lane_id; i < tile_count; i += tile.size()) {
auto const& [build_idx, probe_idx] = buffer[i];
*(build_indices + offset + i) = build_idx;
*(probe_indices + offset + i) = probe_idx;
}
}

__device__ void flush_buffer(cooperative_groups::thread_block const& block,
cudf::size_type buffer_size,
cuco::pair<cudf::size_type, cudf::size_type>* buffer,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
auto i = block.thread_rank();
__shared__ cudf::size_type offset;

if (i == 0) { offset = atomicAdd(counter, buffer_size); }
block.sync();

while (i < buffer_size) {
auto const& [build_idx, probe_idx] = buffer[i];
*(build_indices + offset + i) = build_idx;
*(probe_indices + offset + i) = probe_idx;

i += block.size();
}
}

// TODO: custom kernel to be replaced by cuco::static_set::retrieve
template <typename Iter, typename HashTable>
CUDF_KERNEL void distinct_join_probe_kernel(Iter iter,
cudf::size_type n,
HashTable hash_table,
cudf::size_type* counter,
cudf::size_type* build_indices,
cudf::size_type* probe_indices)
{
namespace cg = cooperative_groups;

auto constexpr tile_size = HashTable::cg_size;
auto constexpr window_size = HashTable::window_size;

auto idx = cudf::detail::grid_1d::global_thread_id() / tile_size;
auto const stride = cudf::detail::grid_1d::grid_stride() / tile_size;
auto const block = cg::this_thread_block();

// CG-based probing algorithm
if constexpr (tile_size != 1) {
auto const tile = cg::tiled_partition<tile_size>(block);

auto constexpr flushing_tile_size = cudf::detail::warp_size / window_size;
// random choice to tune
auto constexpr flushing_buffer_size = 2 * flushing_tile_size;
auto constexpr num_flushing_tiles = DISTINCT_JOIN_BLOCK_SIZE / flushing_tile_size;
auto constexpr max_matches = flushing_tile_size / tile_size;

auto const flushing_tile = cg::tiled_partition<flushing_tile_size>(block);
auto const flushing_tile_id = block.thread_rank() / flushing_tile_size;

__shared__ cuco::pair<cudf::size_type, cudf::size_type>
flushing_tile_buffer[num_flushing_tiles][flushing_tile_size];
// per flushing-tile counter to track number of filled elements
__shared__ cudf::size_type flushing_counter[num_flushing_tiles];

if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; }
flushing_tile.sync(); // sync still needed since cg.any doesn't imply a memory barrier

while (flushing_tile.any(idx < n)) {
bool active_flag = idx < n;
auto const active_flushing_tile =
cg::binary_partition<flushing_tile_size>(flushing_tile, active_flag);
if (active_flag) {
auto const found = hash_table.find(tile, *(iter + idx));
if (tile.thread_rank() == 0 and found != hash_table.end()) {
auto const offset = atomicAdd_block(&flushing_counter[flushing_tile_id], 1);
flushing_tile_buffer[flushing_tile_id][offset] = cuco::pair{
static_cast<cudf::size_type>(found->second), static_cast<cudf::size_type>(idx)};
}
}

flushing_tile.sync();
if (flushing_counter[flushing_tile_id] + max_matches > flushing_buffer_size) {
flush_buffer(flushing_tile,
flushing_counter[flushing_tile_id],
flushing_tile_buffer[flushing_tile_id],
counter,
build_indices,
probe_indices);
flushing_tile.sync();
if (flushing_tile.thread_rank() == 0) { flushing_counter[flushing_tile_id] = 0; }
flushing_tile.sync();
}

idx += stride;
} // while

if (flushing_counter[flushing_tile_id] > 0) {
flush_buffer(flushing_tile,
flushing_counter[flushing_tile_id],
flushing_tile_buffer[flushing_tile_id],
counter,
build_indices,
probe_indices);
}
}
// Scalar probing for CG size 1
else {
using block_scan = cub::BlockScan<cudf::size_type, DISTINCT_JOIN_BLOCK_SIZE>;
__shared__ typename block_scan::TempStorage block_scan_temp_storage;

auto constexpr buffer_capacity = 2 * DISTINCT_JOIN_BLOCK_SIZE;
__shared__ cuco::pair<cudf::size_type, cudf::size_type> buffer[buffer_capacity];
cudf::size_type buffer_size = 0;

while (idx - block.thread_rank() < n) { // the whole thread block falls into the same iteration
auto const found = idx < n ? hash_table.find(*(iter + idx)) : hash_table.end();
auto const has_match = found != hash_table.end();

// Use a whole-block scan to calculate the output location
cudf::size_type offset;
cudf::size_type block_count;
block_scan(block_scan_temp_storage)
.ExclusiveSum(static_cast<cudf::size_type>(has_match), offset, block_count);

if (buffer_size + block_count > buffer_capacity) {
flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices);
block.sync();
buffer_size = 0;
}

if (has_match) {
buffer[buffer_size + offset] = cuco::pair{static_cast<cudf::size_type>(found->second),
static_cast<cudf::size_type>(idx)};
}
buffer_size += block_count;
block.sync();

idx += stride;
} // while

if (buffer_size > 0) {
flush_buffer(block, buffer_size, buffer, counter, build_indices, probe_indices);
}
__device__ constexpr cudf::size_type operator()(
cuco::pair<hash_value_type, rhs_index_type> const& x) const
{
return static_cast<cudf::size_type>(x.second);
}
}
};
} // namespace

template <cudf::has_nested HasNested>
Expand Down Expand Up @@ -332,19 +175,16 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
auto const d_probe_hasher = probe_row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls});
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), rhs_index_type>{d_probe_hasher});
auto counter = rmm::device_scalar<cudf::size_type>{stream};
counter.set_value_to_zero_async(stream);

cudf::detail::grid_1d grid{probe_table_num_rows, DISTINCT_JOIN_BLOCK_SIZE};
distinct_join_probe_kernel<<<grid.num_blocks, grid.num_threads_per_block, 0, stream.value()>>>(
iter,
probe_table_num_rows,
this->_hash_table.ref(cuco::find),
counter.data(),
build_indices->data(),
probe_indices->data());

auto const actual_size = counter.value(stream);

auto const build_indices_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
auto const probe_indices_begin =
thrust::make_transform_output_iterator(probe_indices->begin(), output_fn{});

auto const [probe_indices_end, _] = this->_hash_table.retrieve(
iter, iter + probe_table_num_rows, probe_indices_begin, build_indices_begin, stream.value());

auto const actual_size = std::distance(probe_indices_begin, probe_indices_end);
build_indices->resize(actual_size, stream);
probe_indices->resize(actual_size, stream);

Expand Down

0 comments on commit bc3071e

Please sign in to comment.