Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor joins for conditional semis and antis #14646

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e9ae4e7
refactor joins for conditional semis and antis to not wastefully allo…
ZelboK Dec 18, 2023
1300bc8
Merge branch 'branch-24.02' into feat-joins-refactor-semi-anti
ZelboK Dec 20, 2023
6c1030e
save
ZelboK Dec 21, 2023
2605529
Merge branch 'branch-24.02' into feat-joins-refactor-semi-anti
PointKernel Jan 3, 2024
365e2c6
Update cpp/src/join/conditional_join.cu
DanialJavady96 Jan 3, 2024
0411c85
Update cpp/src/join/conditional_join.cu
DanialJavady96 Jan 3, 2024
c77ea8c
Merge branch 'feat-joins-refactor-semi-anti' of github.com:DanialJava…
ZelboK Jan 3, 2024
20f190e
address comments, clean up
ZelboK Jan 4, 2024
e39c3a0
transition to grid stride
ZelboK Jan 4, 2024
9bf46a6
refactor to grid stride
ZelboK Jan 4, 2024
dfb5e34
Merge branch 'branch-24.02' into feat-joins-refactor-semi-anti
DanialJavady96 Jan 4, 2024
2d1b9ae
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
bdice Apr 16, 2024
4bcb922
Update cpp/src/join/conditional_join.cu
DanialJavady96 Apr 17, 2024
6f670f8
Update cpp/src/join/conditional_join_kernels.cuh
DanialJavady96 Apr 17, 2024
000d89d
Format and clean up.
ZelboK Apr 17, 2024
0c2307f
Update cpp/src/join/conditional_join.cu
ZelboK Apr 18, 2024
756bb4c
Update cpp/src/join/conditional_join.cu
ZelboK Apr 18, 2024
6dfb7d5
Update cpp/src/join/conditional_join.cu
ZelboK Apr 18, 2024
ebe95f5
Address comments, use async memory resource
ZelboK Apr 18, 2024
4267899
Refactor to use async memory resource
ZelboK Apr 18, 2024
ac864f6
Refactor to async ref mr
ZelboK Apr 18, 2024
a4457f5
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
bdice Apr 18, 2024
26feb6f
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
ZelboK Apr 19, 2024
cb89def
Update cpp/src/join/conditional_join_kernels.cuh
ZelboK Apr 19, 2024
53f23ab
Update cpp/src/join/conditional_join_kernels.cuh
ZelboK Apr 19, 2024
eeecd8a
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
ZelboK Apr 23, 2024
24c1c52
Remove needless iterating over anti/semi joins when condition found.
ZelboK Apr 22, 2024
91b79a4
Formatting
ZelboK Apr 23, 2024
6fae49f
fmt
ZelboK Apr 24, 2024
7541016
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
PointKernel Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 119 additions & 16 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,103 @@
namespace cudf {
namespace detail {

std::unique_ptr<rmm::device_uvector<size_type>> conditional_join_semi(
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
join_kind join_type,
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
ZelboK marked this conversation as resolved.
Show resolved Hide resolved
{
auto right_num_rows{right.num_rows()};
auto left_num_rows{left.num_rows()};
ZelboK marked this conversation as resolved.
Show resolved Hide resolved
if (right_num_rows == 0) {
switch (join_type) {
case join_kind::LEFT_ANTI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(left_num_rows, stream, mr);
case join_kind::LEFT_SEMI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
default: CUDF_FAIL("Invalid join kind."); break;
}
} else if (left_num_rows == 0) {
switch (join_type) {
case join_kind::LEFT_ANTI_JOIN:
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
case join_kind::LEFT_SEMI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
default: CUDF_FAIL("Invalid join kind."); break;
}
}

auto const has_nulls = binary_predicate.may_evaluate_null(left, right, stream);

auto const parser =
ast::detail::expression_parser{binary_predicate, left, right, has_nulls, stream, mr};
CUDF_EXPECTS(parser.output_type().id() == type_id::BOOL8,
"The expression must produce a boolean output.");
ZelboK marked this conversation as resolved.
Show resolved Hide resolved

auto left_table = table_device_view::create(left, stream);
auto right_table = table_device_view::create(right, stream);

detail::grid_1d const config(left_num_rows, DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;

// the code below can also be taken out in the context of semi & anti, but
// i will leave that for another PR for sake of being conservative
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
std::size_t join_size;
if (output_size.has_value()) {
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
join_size = *output_size;
} else {
rmm::device_scalar<std::size_t> size(0, stream, mr);
if (has_nulls) {
compute_conditional_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table, *right_table, join_type, parser.device_expression_data, false, size.data());
} else {
compute_conditional_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table, *right_table, join_type, parser.device_expression_data, false, size.data());
}
join_size = size.value(stream);
}

if (join_size == 0) { return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr); }

rmm::device_scalar<size_type> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);
auto right_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved

auto const& join_output_l = left_indices->data();
// i am allocating twice the default cache size for these joins, because, you aren't concerned
// with the right side at all. So that right side is more space to be used for the left side. This
// should increase the size of the batches, leading to less frequent updates to global memory
// which should also improve speed
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
if (has_nulls) {
conditional_join_semi<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE * 2, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table,
*right_table,
join_type,
join_output_l,
write_index.data(),
parser.device_expression_data,
join_size);
} else {
conditional_join_semi<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE * 2, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table,
*right_table,
join_type,
join_output_l,
write_index.data(),
parser.device_expression_data,
join_size);
}

return left_indices;
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
conditional_join(table_view const& left,
Expand All @@ -45,6 +142,14 @@ conditional_join(table_view const& left,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Idk enough about libcudf to know if this code is called directly. I left it here just in case,
// I don't think it should be needed since this function isn't meant to be called directly anyway.
// Will remove on request
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
if (join_type == join_kind::LEFT_ANTI_JOIN || join_type == join_kind::LEFT_SEMI_JOIN) {
return std::make_pair(std::move(conditional_join_semi(
left, right, binary_predicate, join_type, output_size, stream, mr)),
std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr));
}
// We can immediately filter out cases where the right table is empty. In
// some cases, we return all the rows of the left table with a corresponding
// null index for the right table; in others, we return an empty output.
Expand Down Expand Up @@ -347,14 +452,13 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_semi_join(
rmm::mr::device_memory_resource* mr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☑️ todo: ‏ Please use rmm::device_async_resource_ref (not pointer)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #15498 for details / examples. It should be a straightforward replacement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DanialJavady96 I can launch another round of CI tests once this and the below (line 366) mr is migrated to the new async resource ref.

{
CUDF_FUNC_RANGE();
return std::move(detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::LEFT_SEMI_JOIN,
output_size,
cudf::get_default_stream(),
mr)
.first);
return std::move(detail::conditional_join_semi(left,
right,
binary_predicate,
detail::join_kind::LEFT_SEMI_JOIN,
output_size,
cudf::get_default_stream(),
mr));
}

std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_anti_join(
Expand All @@ -365,14 +469,13 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_anti_join(
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return std::move(detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::LEFT_ANTI_JOIN,
output_size,
cudf::get_default_stream(),
mr)
.first);
return std::move(detail::conditional_join_semi(left,
right,
binary_predicate,
detail::join_kind::LEFT_ANTI_JOIN,
output_size,
cudf::get_default_stream(),
mr));
}

std::size_t conditional_inner_join_size(table_view const& left,
Expand Down
94 changes: 94 additions & 0 deletions cpp/src/join/conditional_join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,100 @@ __global__ void conditional_join(table_device_view left_table,
}
}

template <cudf::size_type block_size, cudf::size_type output_cache_size, bool has_nulls>
__global__ void conditional_join_semi(
table_device_view left_table,
table_device_view right_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size)
{
constexpr int num_warps = block_size / detail::warp_size;
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];

extern __shared__ char raw_intermediate_storage[];
cudf::ast::detail::IntermediateDataType<has_nulls>* intermediate_storage =
reinterpret_cast<cudf::ast::detail::IntermediateDataType<has_nulls>*>(raw_intermediate_storage);
auto thread_intermediate_storage =
&intermediate_storage[threadIdx.x * device_expression_data.num_intermediates];

int const warp_id = threadIdx.x / detail::warp_size;
int const lane_id = threadIdx.x % detail::warp_size;
cudf::thread_index_type const left_num_rows = left_table.num_rows();
cudf::thread_index_type const right_num_rows = right_table.num_rows();
cudf::thread_index_type const outer_num_rows = left_num_rows;
cudf::thread_index_type const inner_num_rows = right_num_rows;
ZelboK marked this conversation as resolved.
Show resolved Hide resolved

if (0 == lane_id) { current_idx_shared[warp_id] = 0; }

__syncwarp();

auto outer_row_index = cudf::detail::grid_1d::global_thread_id();
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved

unsigned int const activemask = __ballot_sync(0xffff'ffffu, outer_row_index < left_num_rows);

auto evaluator = cudf::ast::detail::expression_evaluator<has_nulls>(
left_table, right_table, device_expression_data);

if (outer_row_index < outer_num_rows) {
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();

evaluator.evaluate(
output_dest, outer_row_index, inner_row_index, 0, thread_intermediate_storage);

if (output_dest.is_valid() && output_dest.value()) {
if (join_type == join_kind::LEFT_SEMI_JOIN && !found_match) {
add_left_to_cache(outer_row_index, current_idx_shared, warp_id, join_shared_l[warp_id]);
}
found_match = true;
Copy link
Contributor

@bdice bdice Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once found_match is true for an outer_row_index, are we allowed to quit evaluating more inner rows for matches? It seems like we should be able to trigger the flush code and then go to the next outer_row_index. Both SEMI and ANTI joins check that found_match is false before adding an outer_row_index, but it seems like they would continue evaluating other inner rows anyway (but they shouldn't have to do so). Does that sound right?

Copy link
Contributor

@ZelboK ZelboK Apr 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch! Yeah there is no need at all for it to continue searching that space after it finds a match. I am really curious about the speed improvements this change will make. Let me benchmark before i push up.

}

__syncwarp(activemask);

auto const do_flush = current_idx_shared[warp_id] + detail::warp_size >= output_cache_size;
auto const flush_mask = __ballot_sync(activemask, do_flush);
if (do_flush) {
flush_left_cache<num_warps, output_cache_size>(flush_mask,
max_size,
warp_id,
lane_id,
current_idx,
current_idx_shared,
join_shared_l,
join_output_l);
__syncwarp(flush_mask);
if (0 == lane_id) { current_idx_shared[warp_id] = 0; }
}
__syncwarp(activemask);
}

if ((join_type == join_kind::LEFT_ANTI_JOIN) && (!found_match)) {
add_left_to_cache(outer_row_index, current_idx_shared, warp_id, join_shared_l[warp_id]);
}

__syncwarp(activemask);

auto const do_flush = current_idx_shared[warp_id] > 0;
auto const flush_mask = __ballot_sync(activemask, do_flush);
if (do_flush) {
flush_left_cache<num_warps, output_cache_size>(flush_mask,
max_size,
warp_id,
lane_id,
current_idx,
current_idx_shared,
join_shared_l,
join_output_l);
}
}
}

} // namespace detail

} // namespace cudf
37 changes: 36 additions & 1 deletion cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,21 @@ __inline__ __device__ void add_pair_to_cache(size_type const first,
size_type* joined_shared_r)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};

// its guaranteed to fit into the shared cache
joined_shared_l[my_current_idx] = first;
joined_shared_r[my_current_idx] = second;
}

__inline__ __device__ void add_left_to_cache(size_type const first,
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
size_type* current_idx_shared,
int const warp_id,
size_type* joined_shared_l)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};

joined_shared_l[my_current_idx] = first;
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
Expand Down Expand Up @@ -322,6 +331,32 @@ __device__ void flush_output_cache(unsigned int const activemask,
}
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_left_cache(unsigned int const activemask,
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
cudf::size_type const max_size,
int const warp_id,
int const lane_id,
cudf::size_type* current_idx,
cudf::size_type current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type* join_output_l)
{
int num_threads = __popc(activemask);
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }

output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
cudf::size_type thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
}
}
}

} // namespace detail

} // namespace cudf
Loading