Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable join results with size > INT32_MAX #8139

Merged
merged 9 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cstddef>
#include <iostream>
#include <numeric>

Expand Down Expand Up @@ -176,7 +177,7 @@ std::unique_ptr<multimap_type, std::function<void(multimap_type *)>> build_join_
CUDF_EXPECTS(0 != build_device_table->num_rows(), "Build side table has no rows");

size_type const build_table_num_rows{build_device_table->num_rows()};
size_t const hash_table_size = compute_hash_table_size(build_table_num_rows);
std::size_t const hash_table_size = compute_hash_table_size(build_table_num_rows);

auto hash_table = multimap_type::create(hash_table_size,
stream,
Expand Down Expand Up @@ -228,7 +229,7 @@ probe_join_hash_table(cudf::table_device_view build_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr)
{
size_type estimated_size = estimate_join_output_size<JoinKind, multimap_type>(
std::size_t estimated_size = estimate_join_output_size<JoinKind, multimap_type>(
build_table, probe_table, hash_table, compare_nulls, stream);

// If the estimated output size is zero, return immediately
Expand All @@ -242,7 +243,7 @@ probe_join_hash_table(cudf::table_device_view build_table,
// As such we will need to de-allocate memory and re-allocate memory to ensure
// that the final output is correct.
rmm::device_scalar<size_type> write_index(0, stream);
size_type join_size{0};
std::size_t join_size{0};

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
auto right_indices = std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
Expand Down
26 changes: 10 additions & 16 deletions cpp/src/join/hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include <thrust/sequence.h>

#include <cstddef>
#include <limits>

namespace cudf {
Expand Down Expand Up @@ -62,14 +63,12 @@ namespace detail {
* @return An estimate of the size of the output of the join operation
*/
template <join_kind JoinKind, typename multimap_type>
size_type estimate_join_output_size(table_device_view build_table,
table_device_view probe_table,
multimap_type const& hash_table,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
std::size_t estimate_join_output_size(table_device_view build_table,
table_device_view probe_table,
multimap_type const& hash_table,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
{
using estimate_size_type = int64_t; // use 64-bit size so we can detect overflow

const size_type build_table_num_rows{build_table.num_rows()};
const size_type probe_table_num_rows{probe_table.num_rows()};

Expand Down Expand Up @@ -100,8 +99,8 @@ size_type estimate_join_output_size(table_device_view build_table,
if (probe_to_build_ratio > MAX_RATIO) { sample_probe_num_rows = build_table_num_rows; }

// Allocate storage for the counter used to get the size of the join output
estimate_size_type h_size_estimate{0};
rmm::device_scalar<estimate_size_type> size_estimate(0, stream);
std::size_t h_size_estimate{0};
rmm::device_scalar<std::size_t> size_estimate(0, stream);

CHECK_CUDA(stream.value());

Expand Down Expand Up @@ -148,11 +147,6 @@ size_type estimate_join_output_size(table_device_view build_table,
h_size_estimate = size_estimate.value(stream);
}

// Detect overflow
CUDF_EXPECTS(h_size_estimate <
static_cast<estimate_size_type>(std::numeric_limits<cudf::size_type>::max()),
"Maximum join output size exceeded");

// If the size estimate is non-zero, then we have a valid estimate and can break
// If sample_probe_num_rows >= probe_table_num_rows, then we've sampled the entire
// probe table, in which case the estimate is exact and we can break
Expand All @@ -165,12 +159,12 @@ size_type estimate_join_output_size(table_device_view build_table,
constexpr size_type GROW_RATIO{2};
sample_probe_num_rows *= GROW_RATIO;
probe_to_build_ratio =
static_cast<size_type>(std::ceil(static_cast<float>(probe_to_build_ratio) / GROW_RATIO));
static_cast<size_t>(std::ceil(static_cast<float>(probe_to_build_ratio) / GROW_RATIO));
}

} while (true);

return static_cast<cudf::size_type>(h_size_estimate);
return h_size_estimate;
}

/**
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/join/join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <cstddef>
#include <cub/cub.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
Expand Down Expand Up @@ -119,17 +120,14 @@ __global__ void build_hash_table(multimap_type multi_map,
* @param[in] probe_table_num_rows The number of rows in the probe table
* @param[out] output_size The resulting output size
*/
template <join_kind JoinKind,
typename multimap_type,
int block_size,
typename estimate_size_type = int64_t>
template <join_kind JoinKind, typename multimap_type, int block_size>
__global__ void compute_join_output_size(multimap_type multi_map,
table_device_view build_table,
table_device_view probe_table,
row_hash hash_probe,
row_equality check_row_equality,
const cudf::size_type probe_table_num_rows,
estimate_size_type* output_size)
std::size_t* output_size)
{
// This kernel probes multiple elements in the probe_table and store the number of matches found
// inside a register. A block reduction is used at the end to calculate the matches per thread
Expand Down Expand Up @@ -193,9 +191,9 @@ __global__ void compute_join_output_size(multimap_type multi_map,
}
}

using BlockReduce = cub::BlockReduce<estimate_size_type, block_size>;
using BlockReduce = cub::BlockReduce<std::size_t, block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
estimate_size_type block_counter = BlockReduce(temp_storage).Sum(thread_counter);
std::size_t block_counter = BlockReduce(temp_storage).Sum(thread_counter);

// Add block counter to global counter
if (threadIdx.x == 0) atomicAdd(output_size, block_counter);
Expand Down Expand Up @@ -311,7 +309,7 @@ __global__ void probe_hash_table(multimap_type multi_map,
size_type* join_output_l,
size_type* join_output_r,
cudf::size_type* current_idx,
const cudf::size_type max_size)
const std::size_t max_size)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ size_type current_idx_shared[num_warps];
Expand Down
23 changes: 0 additions & 23 deletions cpp/tests/join/join_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,29 +607,6 @@ TEST_F(JoinTest, LeftJoinOnNulls)
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_gold, *sorted_result);
}

TEST_F(JoinTest, InnerJoinSizeOverflow)
{
auto zero = cudf::make_numeric_scalar(cudf::data_type(cudf::type_id::INT32));
zero->set_valid(true);
static_cast<cudf::scalar_type_t<int32_t> *>(zero.get())->set_value(0);

// Should cause size overflow, raise exception
int32_t left = 4;
int32_t right = 1073741825;

auto col0_0 = cudf::make_column_from_scalar(*zero, left);
auto col1_0 = cudf::make_column_from_scalar(*zero, right);

CVector cols0, cols1;
cols0.push_back(std::move(col0_0));
cols1.push_back(std::move(col1_0));

Table t0(std::move(cols0));
Table t1(std::move(cols1));

EXPECT_THROW(cudf::inner_join(t0, t1, {0}, {0}), cudf::logic_error);
}

TEST_F(JoinTest, InnerJoinNoNulls)
{
column_wrapper<int32_t> col0_0{{3, 1, 2, 0, 2}};
Expand Down