Skip to content

Commit

Permalink
Merge branch 'branch-24.08' into remove-host_parse_nested_json-impl
Browse files Browse the repository at this point in the history
  • Loading branch information
vuule authored Jun 28, 2024
2 parents 7c43a94 + 3c3edfe commit dd74cc6
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 121 deletions.
22 changes: 21 additions & 1 deletion cpp/include/cudf/detail/distinct_hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ template <typename Equal>
struct comparator_adapter {
comparator_adapter(Equal const& d_equal) : _d_equal{d_equal} {}

// suppress "function was declared but never referenced warning"
#pragma nv_diagnostic push
#pragma nv_diag_suppress 177
__device__ constexpr auto operator()(
cuco::pair<hash_value_type, lhs_index_type> const&,
cuco::pair<hash_value_type, lhs_index_type> const&) const noexcept
Expand All @@ -50,6 +53,14 @@ struct comparator_adapter {
return false;
}

__device__ constexpr auto operator()(
cuco::pair<hash_value_type, rhs_index_type> const&,
cuco::pair<hash_value_type, rhs_index_type> const&) const noexcept
{
// All build table keys are distinct thus `false` no matter what
return false;
}

__device__ constexpr auto operator()(
cuco::pair<hash_value_type, lhs_index_type> const& lhs,
cuco::pair<hash_value_type, rhs_index_type> const& rhs) const noexcept
Expand All @@ -58,6 +69,15 @@ struct comparator_adapter {
return _d_equal(lhs.second, rhs.second);
}

__device__ constexpr auto operator()(
cuco::pair<hash_value_type, rhs_index_type> const& lhs,
cuco::pair<hash_value_type, lhs_index_type> const& rhs) const noexcept
{
if (lhs.first != rhs.first) { return false; }
return _d_equal(lhs.second, rhs.second);
}
#pragma nv_diagnostic pop

private:
Equal _d_equal;
};
Expand Down Expand Up @@ -94,7 +114,7 @@ struct distinct_hash_join {
using cuco_storage_type = cuco::storage<1>;

/// Hash table type
using hash_table_type = cuco::static_set<cuco::pair<hash_value_type, lhs_index_type>,
using hash_table_type = cuco::static_set<cuco::pair<hash_value_type, rhs_index_type>,
cuco::extent<size_type>,
cuda::thread_scope_device,
comparator_adapter<d_equal_type>,
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_join_anti_semi(
join_size = size.value(stream);
}

rmm::device_scalar<size_type> write_index(0, stream);
rmm::device_scalar<std::size_t> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);

Expand Down Expand Up @@ -232,13 +232,14 @@ conditional_join(table_view const& left,
std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr));
}

rmm::device_scalar<size_type> write_index(0, stream);
rmm::device_scalar<std::size_t> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);
auto right_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);

auto const& join_output_l = left_indices->data();
auto const& join_output_r = right_indices->data();

if (has_nulls) {
conditional_join<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
Expand Down
124 changes: 114 additions & 10 deletions cpp/src/join/conditional_join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,110 @@
namespace cudf {
namespace detail {

/**
* @brief Adds a pair of indices to the shared memory cache
*
* @param[in] first The first index in the pair
* @param[in] second The second index in the pair
* @param[in,out] current_idx_shared Pointer to shared index that determines
* where in the shared memory cache the pair will be written
* @param[in] warp_id The ID of the warp of the calling the thread
* @param[out] joined_shared_l Pointer to the shared memory cache for left indices
* @param[out] joined_shared_r Pointer to the shared memory cache for right indices
*/
__inline__ __device__ void add_pair_to_cache(size_type const first,
size_type const second,
std::size_t* current_idx_shared,
int const warp_id,
size_type* joined_shared_l,
size_type* joined_shared_r)
{
cuda::atomic_ref<std::size_t, cuda::thread_scope_block> ref{*(current_idx_shared + warp_id)};
std::size_t my_current_idx = ref.fetch_add(1, cuda::memory_order_relaxed);
// It's guaranteed to fit into the shared cache
joined_shared_l[my_current_idx] = first;
joined_shared_r[my_current_idx] = second;
}

__inline__ __device__ void add_left_to_cache(size_type const first,
std::size_t* current_idx_shared,
int const warp_id,
size_type* joined_shared_l)
{
cuda::atomic_ref<std::size_t, cuda::thread_scope_block> ref{*(current_idx_shared + warp_id)};
std::size_t my_current_idx = ref.fetch_add(1, cuda::memory_order_relaxed);
joined_shared_l[my_current_idx] = first;
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
std::size_t const max_size,
int const warp_id,
int const lane_id,
std::size_t* current_idx,
std::size_t current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type join_shared_r[num_warps][output_cache_size],
size_type* join_output_l,
size_type* join_output_r)
{
// count how many active threads participating here which could be less than warp_size
int const num_threads = __popc(activemask);
std::size_t output_offset = 0;

if (0 == lane_id) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*current_idx};
output_offset = ref.fetch_add(current_idx_shared[warp_id], cuda::memory_order_relaxed);
}

// No warp sync is necessary here because we are assuming that ShuffleIndex
// is internally using post-CUDA 9.0 synchronization-safe primitives
// (__shfl_sync instead of __shfl). __shfl is technically not guaranteed to
// be safe by the compiler because it is not required by the standard to
// converge divergent branches before executing.
output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (std::size_t shared_out_idx = static_cast<std::size_t>(lane_id);
shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
std::size_t thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
join_output_r[thread_offset] = join_shared_r[warp_id][shared_out_idx];
}
}
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
std::size_t const max_size,
int const warp_id,
int const lane_id,
std::size_t* current_idx,
std::size_t current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type* join_output_l)
{
int const num_threads = __popc(activemask);
std::size_t output_offset = 0;

if (0 == lane_id) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*current_idx};
output_offset = ref.fetch_add(current_idx_shared[warp_id], cuda::memory_order_relaxed);
}

output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (std::size_t shared_out_idx = static_cast<std::size_t>(lane_id);
shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
std::size_t thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
}
}
}

/**
* @brief Computes the output size of joining the left table to the right table.
*
Expand Down Expand Up @@ -103,14 +207,14 @@ CUDF_KERNEL void compute_conditional_join_output_size(
}
}

using BlockReduce = cub::BlockReduce<cudf::size_type, block_size>;
using BlockReduce = cub::BlockReduce<std::size_t, block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
std::size_t block_counter = BlockReduce(temp_storage).Sum(thread_counter);

// Add block counter to global counter
if (threadIdx.x == 0) {
cuda::atomic_ref<std::size_t, cuda::thread_scope_device> ref{*output_size};
ref.fetch_add(block_counter, cuda::std::memory_order_relaxed);
ref.fetch_add(block_counter, cuda::memory_order_relaxed);
}
}

Expand Down Expand Up @@ -143,13 +247,13 @@ CUDF_KERNEL void conditional_join(table_device_view left_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* join_output_r,
cudf::size_type* current_idx,
std::size_t* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size,
std::size_t const max_size,
bool const swap_tables)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ std::size_t current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];
__shared__ cudf::size_type join_shared_r[num_warps][output_cache_size];

Expand Down Expand Up @@ -183,7 +287,7 @@ CUDF_KERNEL void conditional_join(table_device_view left_table,

if (outer_row_index < outer_num_rows) {
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
for (cudf::thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();
auto const left_row_index = swap_tables ? inner_row_index : outer_row_index;
Expand Down Expand Up @@ -277,12 +381,12 @@ CUDF_KERNEL void conditional_join_anti_semi(
table_device_view right_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* current_idx,
std::size_t* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size)
std::size_t const max_size)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ std::size_t current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];

extern __shared__ char raw_intermediate_storage[];
Expand Down Expand Up @@ -310,7 +414,7 @@ CUDF_KERNEL void conditional_join_anti_semi(
for (cudf::thread_index_type outer_row_index = start_idx; outer_row_index < outer_num_rows;
outer_row_index += stride) {
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
for (cudf::thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();

Expand Down
10 changes: 5 additions & 5 deletions cpp/src/join/distinct_hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ auto prepare_device_equal(
cudf::null_equality compare_nulls)
{
auto const two_table_equal =
cudf::experimental::row::equality::two_table_comparator(build, probe);
cudf::experimental::row::equality::two_table_comparator(probe, build);
return comparator_adapter{two_table_equal.equal_to<HasNested == cudf::has_nested::YES>(
nullate::DYNAMIC{has_nulls}, compare_nulls)};
}
Expand Down Expand Up @@ -113,7 +113,7 @@ distinct_hash_join<HasNested>::distinct_hash_join(cudf::table_view const& build,
_hash_table{build.num_rows(),
CUCO_DESIRED_LOAD_FACTOR,
cuco::empty_key{cuco::pair{std::numeric_limits<hash_value_type>::max(),
lhs_index_type{JoinNoneValue}}},
rhs_index_type{JoinNoneValue}}},
prepare_device_equal<HasNested>(
_preprocessed_build, _preprocessed_probe, has_nulls, compare_nulls),
{},
Expand All @@ -131,7 +131,7 @@ distinct_hash_join<HasNested>::distinct_hash_join(cudf::table_view const& build,
auto const d_hasher = row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls});

auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_hasher), lhs_index_type>{d_hasher});
0, build_keys_fn<decltype(d_hasher), rhs_index_type>{d_hasher});

size_type const build_table_num_rows{build.num_rows()};
if (this->_nulls_equal == cudf::null_equality::EQUAL or (not cudf::nullable(this->_build))) {
Expand Down Expand Up @@ -174,7 +174,7 @@ distinct_hash_join<HasNested>::inner_join(rmm::cuda_stream_view stream,
cudf::experimental::row::hash::row_hasher{this->_preprocessed_probe};
auto const d_probe_hasher = probe_row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls});
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), rhs_index_type>{d_probe_hasher});
0, build_keys_fn<decltype(d_probe_hasher), lhs_index_type>{d_probe_hasher});

auto const build_indices_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
Expand Down Expand Up @@ -216,7 +216,7 @@ std::unique_ptr<rmm::device_uvector<size_type>> distinct_hash_join<HasNested>::l
cudf::experimental::row::hash::row_hasher{this->_preprocessed_probe};
auto const d_probe_hasher = probe_row_hasher.device_hasher(nullate::DYNAMIC{this->_has_nulls});
auto const iter = cudf::detail::make_counting_transform_iterator(
0, build_keys_fn<decltype(d_probe_hasher), rhs_index_type>{d_probe_hasher});
0, build_keys_fn<decltype(d_probe_hasher), lhs_index_type>{d_probe_hasher});

auto const output_begin =
thrust::make_transform_output_iterator(build_indices->begin(), output_fn{});
Expand Down
Loading

0 comments on commit dd74cc6

Please sign in to comment.