Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor joins for conditional semis and antis #14646

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e9ae4e7
refactor joins for conditional semis and antis to not wastefully allo…
ZelboK Dec 18, 2023
1300bc8
Merge branch 'branch-24.02' into feat-joins-refactor-semi-anti
ZelboK Dec 20, 2023
6c1030e
save
ZelboK Dec 21, 2023
2605529
Merge branch 'branch-24.02' into feat-joins-refactor-semi-anti
PointKernel Jan 3, 2024
365e2c6
Update cpp/src/join/conditional_join.cu
DanialJavady96 Jan 3, 2024
0411c85
Update cpp/src/join/conditional_join.cu
DanialJavady96 Jan 3, 2024
c77ea8c
Merge branch 'feat-joins-refactor-semi-anti' of github.com:DanialJava…
ZelboK Jan 3, 2024
20f190e
address comments, clean up
ZelboK Jan 4, 2024
e39c3a0
transition to grid stride
ZelboK Jan 4, 2024
9bf46a6
refactor to grid stride
ZelboK Jan 4, 2024
dfb5e34
Merge branch 'branch-24.02' into feat-joins-refactor-semi-anti
DanialJavady96 Jan 4, 2024
2d1b9ae
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
bdice Apr 16, 2024
4bcb922
Update cpp/src/join/conditional_join.cu
DanialJavady96 Apr 17, 2024
6f670f8
Update cpp/src/join/conditional_join_kernels.cuh
DanialJavady96 Apr 17, 2024
000d89d
Format and clean up.
ZelboK Apr 17, 2024
0c2307f
Update cpp/src/join/conditional_join.cu
ZelboK Apr 18, 2024
756bb4c
Update cpp/src/join/conditional_join.cu
ZelboK Apr 18, 2024
6dfb7d5
Update cpp/src/join/conditional_join.cu
ZelboK Apr 18, 2024
ebe95f5
Address comments, use async memory resource
ZelboK Apr 18, 2024
4267899
Refactor to use async memory resource
ZelboK Apr 18, 2024
ac864f6
Refactor to async ref mr
ZelboK Apr 18, 2024
a4457f5
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
bdice Apr 18, 2024
26feb6f
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
ZelboK Apr 19, 2024
cb89def
Update cpp/src/join/conditional_join_kernels.cuh
ZelboK Apr 19, 2024
53f23ab
Update cpp/src/join/conditional_join_kernels.cuh
ZelboK Apr 19, 2024
eeecd8a
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
ZelboK Apr 23, 2024
24c1c52
Remove needless iterating over anti/semi joins when condition found.
ZelboK Apr 22, 2024
91b79a4
Formatting
ZelboK Apr 23, 2024
6fae49f
fmt
ZelboK Apr 24, 2024
7541016
Merge branch 'branch-24.06' into feat-joins-refactor-semi-anti
PointKernel Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 118 additions & 31 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,99 @@
namespace cudf {
namespace detail {

std::unique_ptr<rmm::device_uvector<size_type>> conditional_join_anti_semi(
table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
join_kind join_type,
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (right.num_rows() == 0) {
switch (join_type) {
case join_kind::LEFT_ANTI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(left.num_rows(), stream, mr);
case join_kind::LEFT_SEMI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
default: CUDF_FAIL("Invalid join kind."); break;
}
} else if (left.num_rows() == 0) {
switch (join_type) {
case join_kind::LEFT_ANTI_JOIN: [[fallthrough]];
case join_kind::LEFT_SEMI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
default: CUDF_FAIL("Invalid join kind."); break;
}
}

auto const has_nulls = binary_predicate.may_evaluate_null(left, right, stream);

auto const parser =
ast::detail::expression_parser{binary_predicate, left, right, has_nulls, stream, mr};
CUDF_EXPECTS(parser.output_type().id() == type_id::BOOL8,
"The expression must produce a Boolean output.");

auto left_table = table_device_view::create(left, stream);
auto right_table = table_device_view::create(right, stream);

detail::grid_1d const config(left.num_rows(), DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;

// TODO: Remove the output_size parameter. It is not needed because the
// output size is bounded by the size of the left table.
std::size_t join_size;
if (output_size.has_value()) {
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
join_size = *output_size;
} else {
// Allocate storage for the counter used to get the size of the join output
rmm::device_scalar<std::size_t> size(0, stream, mr);
if (has_nulls) {
compute_conditional_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table, *right_table, join_type, parser.device_expression_data, false, size.data());
} else {
compute_conditional_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table, *right_table, join_type, parser.device_expression_data, false, size.data());
}
join_size = size.value(stream);
Comment on lines +85 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow-up PR, we should adopt the same optimization used in #15288. In that PR, we pre-allocate an vector of indices (with size equal to the left table's size), and then shrink to fit the actual number of elements written. That means we only have to execute one kernel rather than two.

}

if (left.num_rows() == 0) {
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
}

rmm::device_scalar<size_type> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);

auto const& join_output_l = left_indices->data();

if (has_nulls) {
conditional_join_anti_semi<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table,
*right_table,
join_type,
join_output_l,
write_index.data(),
parser.device_expression_data,
join_size);
} else {
conditional_join_anti_semi<DEFAULT_JOIN_BLOCK_SIZE, DEFAULT_JOIN_CACHE_SIZE, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_table,
*right_table,
join_type,
join_output_l,
write_index.data(),
parser.device_expression_data,
join_size);
}
return left_indices;
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
conditional_join(table_view const& left,
Expand All @@ -50,9 +143,7 @@ conditional_join(table_view const& left,
// We can immediately filter out cases where the right table is empty. In
// some cases, we return all the rows of the left table with a corresponding
// null index for the right table; in others, we return an empty output.
auto right_num_rows{right.num_rows()};
auto left_num_rows{left.num_rows()};
if (right_num_rows == 0) {
if (right.num_rows() == 0) {
switch (join_type) {
// Left, left anti, and full all return all the row indices from left
// with a corresponding NULL from the right.
Expand All @@ -67,7 +158,7 @@ conditional_join(table_view const& left,
std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr));
default: CUDF_FAIL("Invalid join kind."); break;
}
} else if (left_num_rows == 0) {
} else if (left.num_rows() == 0) {
switch (join_type) {
// Left, left anti, left semi, and inner joins all return empty sets.
case join_kind::LEFT_JOIN:
Expand Down Expand Up @@ -101,8 +192,8 @@ conditional_join(table_view const& left,

// For inner joins we support optimizing the join by launching one thread for
// whichever table is larger rather than always using the left table.
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows);
detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows,
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right.num_rows() > left.num_rows());
detail::grid_1d const config(swap_tables ? right.num_rows() : left.num_rows(),
DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;
join_kind const kernel_join_type =
Expand Down Expand Up @@ -187,7 +278,7 @@ conditional_join(table_view const& left,
// by any row in the left table.
if (join_type == join_kind::FULL_JOIN) {
auto complement_indices = detail::get_left_join_indices_complement(
join_indices.second, left_num_rows, right_num_rows, stream, mr);
join_indices.second, left.num_rows(), right.num_rows(), stream, mr);
join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream);
}
return join_indices;
Expand All @@ -210,29 +301,27 @@ std::size_t compute_conditional_join_output_size(table_view const& left,
// We can immediately filter out cases where one table is empty. In
// some cases, we return all the rows of the other table with a corresponding
// null index for the empty table; in others, we return an empty output.
auto right_num_rows{right.num_rows()};
auto left_num_rows{left.num_rows()};
if (right_num_rows == 0) {
if (right.num_rows() == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the place that @bdice was referring to https://github.com/rapidsai/cudf/pull/14646/files#r1573011742.

Depending on how you see it, we should consistently use either auto const ... or .num_rows() in both functions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the entire file to be consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah... Okay. It turns out i was pushing to the wrong fork. That's why there were was some inconsistencies. My bad.

switch (join_type) {
// Left, left anti, and full all return all the row indices from left
// with a corresponding NULL from the right.
case join_kind::LEFT_JOIN:
case join_kind::LEFT_ANTI_JOIN:
case join_kind::FULL_JOIN: return left_num_rows;
case join_kind::FULL_JOIN: return left.num_rows();
// Inner and left semi joins return empty output because no matches can exist.
case join_kind::INNER_JOIN:
case join_kind::LEFT_SEMI_JOIN: return 0;
default: CUDF_FAIL("Invalid join kind."); break;
}
} else if (left_num_rows == 0) {
} else if (left.num_rows() == 0) {
switch (join_type) {
// Left, left anti, left semi, and inner joins all return empty sets.
case join_kind::LEFT_JOIN:
case join_kind::LEFT_ANTI_JOIN:
case join_kind::INNER_JOIN:
case join_kind::LEFT_SEMI_JOIN: return 0;
// Full joins need to return the trivial complement.
case join_kind::FULL_JOIN: return right_num_rows;
case join_kind::FULL_JOIN: return right.num_rows();
default: CUDF_FAIL("Invalid join kind."); break;
}
}
Expand All @@ -254,8 +343,8 @@ std::size_t compute_conditional_join_output_size(table_view const& left,

// For inner joins we support optimizing the join by launching one thread for
// whichever table is larger rather than always using the left table.
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows);
detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows,
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right.num_rows() > left.num_rows());
detail::grid_1d const config(swap_tables ? right.num_rows() : left.num_rows(),
DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;

Expand Down Expand Up @@ -349,14 +438,13 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_semi_join(
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return std::move(detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::LEFT_SEMI_JOIN,
output_size,
cudf::get_default_stream(),
mr)
.first);
return std::move(detail::conditional_join_anti_semi(left,
right,
binary_predicate,
detail::join_kind::LEFT_SEMI_JOIN,
output_size,
cudf::get_default_stream(),
mr));
}

std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_anti_join(
Expand All @@ -367,14 +455,13 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_left_anti_join(
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return std::move(detail::conditional_join(left,
right,
binary_predicate,
detail::join_kind::LEFT_ANTI_JOIN,
output_size,
cudf::get_default_stream(),
mr)
.first);
return std::move(detail::conditional_join_anti_semi(left,
right,
binary_predicate,
detail::join_kind::LEFT_ANTI_JOIN,
output_size,
cudf::get_default_stream(),
mr));
}

std::size_t conditional_inner_join_size(table_view const& left,
Expand Down
94 changes: 94 additions & 0 deletions cpp/src/join/conditional_join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,100 @@ CUDF_KERNEL void conditional_join(table_device_view left_table,
}
}

template <cudf::size_type block_size, cudf::size_type output_cache_size, bool has_nulls>
CUDF_KERNEL void conditional_join_anti_semi(
table_device_view left_table,
table_device_view right_table,
join_kind join_type,
cudf::size_type* join_output_l,
cudf::size_type* current_idx,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const max_size)
{
constexpr int num_warps = block_size / detail::warp_size;
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
__shared__ cudf::size_type current_idx_shared[num_warps];
__shared__ cudf::size_type join_shared_l[num_warps][output_cache_size];

extern __shared__ char raw_intermediate_storage[];
cudf::ast::detail::IntermediateDataType<has_nulls>* intermediate_storage =
reinterpret_cast<cudf::ast::detail::IntermediateDataType<has_nulls>*>(raw_intermediate_storage);
auto thread_intermediate_storage =
&intermediate_storage[threadIdx.x * device_expression_data.num_intermediates];

int const warp_id = threadIdx.x / detail::warp_size;
int const lane_id = threadIdx.x % detail::warp_size;
cudf::thread_index_type const outer_num_rows = left_table.num_rows();
cudf::thread_index_type const inner_num_rows = right_table.num_rows();
auto const stride = cudf::detail::grid_1d::grid_stride();
auto const start_idx = cudf::detail::grid_1d::global_thread_id();

if (0 == lane_id) { current_idx_shared[warp_id] = 0; }

__syncwarp();

unsigned int const activemask = __ballot_sync(0xffff'ffffu, start_idx < outer_num_rows);

auto evaluator = cudf::ast::detail::expression_evaluator<has_nulls>(
left_table, right_table, device_expression_data);

for (cudf::thread_index_type outer_row_index = start_idx; outer_row_index < outer_num_rows;
outer_row_index += stride) {
bool found_match = false;
for (thread_index_type inner_row_index(0); inner_row_index < inner_num_rows;
++inner_row_index) {
auto output_dest = cudf::ast::detail::value_expression_result<bool, has_nulls>();

evaluator.evaluate(
output_dest, outer_row_index, inner_row_index, 0, thread_intermediate_storage);

if (output_dest.is_valid() && output_dest.value()) {
if (join_type == join_kind::LEFT_SEMI_JOIN && !found_match) {
add_left_to_cache(outer_row_index, current_idx_shared, warp_id, join_shared_l[warp_id]);
}
found_match = true;
Copy link
Contributor

@bdice bdice Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once found_match is true for an outer_row_index, are we allowed to quit evaluating more inner rows for matches? It seems like we should be able to trigger the flush code and then go to the next outer_row_index. Both SEMI and ANTI joins check that found_match is false before adding an outer_row_index, but it seems like they would continue evaluating other inner rows anyway (but they shouldn't have to do so). Does that sound right?

Copy link
Contributor

@ZelboK ZelboK Apr 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch! Yeah there is no need at all for it to continue searching that space after it finds a match. I am really curious about the speed improvements this change will make. Let me benchmark before i push up.

}

__syncwarp(activemask);

auto const do_flush = current_idx_shared[warp_id] + detail::warp_size >= output_cache_size;
auto const flush_mask = __ballot_sync(activemask, do_flush);
if (do_flush) {
flush_output_cache<num_warps, output_cache_size>(flush_mask,
max_size,
warp_id,
lane_id,
current_idx,
current_idx_shared,
join_shared_l,
join_output_l);
__syncwarp(flush_mask);
if (0 == lane_id) { current_idx_shared[warp_id] = 0; }
}
__syncwarp(activemask);
}

if ((join_type == join_kind::LEFT_ANTI_JOIN) && (!found_match)) {
add_left_to_cache(outer_row_index, current_idx_shared, warp_id, join_shared_l[warp_id]);
}

__syncwarp(activemask);

auto const do_flush = current_idx_shared[warp_id] > 0;
auto const flush_mask = __ballot_sync(activemask, do_flush);
if (do_flush) {
flush_output_cache<num_warps, output_cache_size>(flush_mask,
max_size,
warp_id,
lane_id,
current_idx,
current_idx_shared,
join_shared_l,
join_output_l);
}
if (found_match) break;
}
}

} // namespace detail

} // namespace cudf
39 changes: 37 additions & 2 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,21 @@ __inline__ __device__ void add_pair_to_cache(size_type const first,
size_type* joined_shared_r)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};

// its guaranteed to fit into the shared cache
joined_shared_l[my_current_idx] = first;
joined_shared_r[my_current_idx] = second;
}

__inline__ __device__ void add_left_to_cache(size_type const first,
DanialJavady96 marked this conversation as resolved.
Show resolved Hide resolved
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
size_type* current_idx_shared,
int const warp_id,
size_type* joined_shared_l)
{
size_type my_current_idx{atomicAdd(current_idx_shared + warp_id, size_type(1))};

joined_shared_l[my_current_idx] = first;
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
Expand All @@ -300,7 +309,7 @@ __device__ void flush_output_cache(unsigned int const activemask,
size_type* join_output_r)
{
// count how many active threads participating here which could be less than warp_size
int num_threads = __popc(activemask);
int const num_threads = __popc(activemask);
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }
Expand All @@ -322,6 +331,32 @@ __device__ void flush_output_cache(unsigned int const activemask,
}
}

template <int num_warps, cudf::size_type output_cache_size>
__device__ void flush_output_cache(unsigned int const activemask,
cudf::size_type const max_size,
int const warp_id,
int const lane_id,
cudf::size_type* current_idx,
cudf::size_type current_idx_shared[num_warps],
size_type join_shared_l[num_warps][output_cache_size],
size_type* join_output_l)
{
int const num_threads = __popc(activemask);
cudf::size_type output_offset = 0;

if (0 == lane_id) { output_offset = atomicAdd(current_idx, current_idx_shared[warp_id]); }

output_offset = cub::ShuffleIndex<detail::warp_size>(output_offset, 0, activemask);

for (int shared_out_idx = lane_id; shared_out_idx < current_idx_shared[warp_id];
shared_out_idx += num_threads) {
cudf::size_type thread_offset = output_offset + shared_out_idx;
if (thread_offset < max_size) {
join_output_l[thread_offset] = join_shared_l[warp_id][shared_out_idx];
}
}
}

} // namespace detail

} // namespace cudf
Loading