Skip to content

Commit

Permalink
Enable join results with size > INT32_MAX (#8139)
Browse files Browse the repository at this point in the history
Closes #8121

Authors:
  - Ashwin Srinath (https://github.com/shwina)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - Jake Hemstad (https://github.com/jrhemstad)
  - Nghia Truong (https://github.com/ttnghia)
  - Robert (Bobby) Evans (https://github.com/revans2)

URL: #8139
  • Loading branch information
shwina authored May 5, 2021
1 parent c0f8176 commit 8cba3b0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 50 deletions.
7 changes: 4 additions & 3 deletions cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cstddef>
#include <iostream>
#include <numeric>

Expand Down Expand Up @@ -176,7 +177,7 @@ std::unique_ptr<multimap_type, std::function<void(multimap_type *)>> build_join_
CUDF_EXPECTS(0 != build_device_table->num_rows(), "Build side table has no rows");

size_type const build_table_num_rows{build_device_table->num_rows()};
size_t const hash_table_size = compute_hash_table_size(build_table_num_rows);
std::size_t const hash_table_size = compute_hash_table_size(build_table_num_rows);

auto hash_table = multimap_type::create(hash_table_size,
stream,
Expand Down Expand Up @@ -228,7 +229,7 @@ probe_join_hash_table(cudf::table_device_view build_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr)
{
size_type estimated_size = estimate_join_output_size<JoinKind, multimap_type>(
std::size_t estimated_size = estimate_join_output_size<JoinKind, multimap_type>(
build_table, probe_table, hash_table, compare_nulls, stream);

// If the estimated output size is zero, return immediately
Expand All @@ -242,7 +243,7 @@ probe_join_hash_table(cudf::table_device_view build_table,
// As such we will need to de-allocate memory and re-allocate memory to ensure
// that the final output is correct.
rmm::device_scalar<size_type> write_index(0, stream);
size_type join_size{0};
std::size_t join_size{0};

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
auto right_indices = std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
Expand Down
26 changes: 10 additions & 16 deletions cpp/src/join/hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include <thrust/sequence.h>

#include <cstddef>
#include <limits>

namespace cudf {
Expand Down Expand Up @@ -62,14 +63,12 @@ namespace detail {
* @return An estimate of the size of the output of the join operation
*/
template <join_kind JoinKind, typename multimap_type>
size_type estimate_join_output_size(table_device_view build_table,
table_device_view probe_table,
multimap_type const& hash_table,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
std::size_t estimate_join_output_size(table_device_view build_table,
table_device_view probe_table,
multimap_type const& hash_table,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
{
using estimate_size_type = int64_t; // use 64-bit size so we can detect overflow

const size_type build_table_num_rows{build_table.num_rows()};
const size_type probe_table_num_rows{probe_table.num_rows()};

Expand Down Expand Up @@ -100,8 +99,8 @@ size_type estimate_join_output_size(table_device_view build_table,
if (probe_to_build_ratio > MAX_RATIO) { sample_probe_num_rows = build_table_num_rows; }

// Allocate storage for the counter used to get the size of the join output
estimate_size_type h_size_estimate{0};
rmm::device_scalar<estimate_size_type> size_estimate(0, stream);
std::size_t h_size_estimate{0};
rmm::device_scalar<std::size_t> size_estimate(0, stream);

CHECK_CUDA(stream.value());

Expand Down Expand Up @@ -148,11 +147,6 @@ size_type estimate_join_output_size(table_device_view build_table,
h_size_estimate = size_estimate.value(stream);
}

// Detect overflow
CUDF_EXPECTS(h_size_estimate <
static_cast<estimate_size_type>(std::numeric_limits<cudf::size_type>::max()),
"Maximum join output size exceeded");

// If the size estimate is non-zero, then we have a valid estimate and can break
// If sample_probe_num_rows >= probe_table_num_rows, then we've sampled the entire
// probe table, in which case the estimate is exact and we can break
Expand All @@ -165,12 +159,12 @@ size_type estimate_join_output_size(table_device_view build_table,
constexpr size_type GROW_RATIO{2};
sample_probe_num_rows *= GROW_RATIO;
probe_to_build_ratio =
static_cast<size_type>(std::ceil(static_cast<float>(probe_to_build_ratio) / GROW_RATIO));
static_cast<size_t>(std::ceil(static_cast<float>(probe_to_build_ratio) / GROW_RATIO));
}

} while (true);

return static_cast<cudf::size_type>(h_size_estimate);
return h_size_estimate;
}

/**
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/join/join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <cstddef>
#include <cub/cub.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
Expand Down Expand Up @@ -119,17 +120,14 @@ __global__ void build_hash_table(multimap_type multi_map,
* @param[in] probe_table_num_rows The number of rows in the probe table
* @param[out] output_size The resulting output size
*/
template <join_kind JoinKind,
typename multimap_type,
int block_size,
typename estimate_size_type = int64_t>
template <join_kind JoinKind, typename multimap_type, int block_size>
__global__ void compute_join_output_size(multimap_type multi_map,
table_device_view build_table,
table_device_view probe_table,
row_hash hash_probe,
row_equality check_row_equality,
const cudf::size_type probe_table_num_rows,
estimate_size_type* output_size)
std::size_t* output_size)
{
// This kernel probes multiple elements in the probe_table and store the number of matches found
// inside a register. A block reduction is used at the end to calculate the matches per thread
Expand Down Expand Up @@ -193,9 +191,9 @@ __global__ void compute_join_output_size(multimap_type multi_map,
}
}

using BlockReduce = cub::BlockReduce<estimate_size_type, block_size>;
using BlockReduce = cub::BlockReduce<std::size_t, block_size>;
__shared__ typename BlockReduce::TempStorage temp_storage;
estimate_size_type block_counter = BlockReduce(temp_storage).Sum(thread_counter);
std::size_t block_counter = BlockReduce(temp_storage).Sum(thread_counter);

// Add block counter to global counter
if (threadIdx.x == 0) atomicAdd(output_size, block_counter);
Expand Down Expand Up @@ -311,7 +309,7 @@ __global__ void probe_hash_table(multimap_type multi_map,
size_type* join_output_l,
size_type* join_output_r,
cudf::size_type* current_idx,
const cudf::size_type max_size)
const std::size_t max_size)
{
constexpr int num_warps = block_size / detail::warp_size;
__shared__ size_type current_idx_shared[num_warps];
Expand Down
23 changes: 0 additions & 23 deletions cpp/tests/join/join_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,29 +607,6 @@ TEST_F(JoinTest, LeftJoinOnNulls)
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_gold, *sorted_result);
}

TEST_F(JoinTest, InnerJoinSizeOverflow)
{
auto zero = cudf::make_numeric_scalar(cudf::data_type(cudf::type_id::INT32));
zero->set_valid(true);
static_cast<cudf::scalar_type_t<int32_t> *>(zero.get())->set_value(0);

// Should cause size overflow, raise exception
int32_t left = 4;
int32_t right = 1073741825;

auto col0_0 = cudf::make_column_from_scalar(*zero, left);
auto col1_0 = cudf::make_column_from_scalar(*zero, right);

CVector cols0, cols1;
cols0.push_back(std::move(col0_0));
cols1.push_back(std::move(col1_0));

Table t0(std::move(cols0));
Table t1(std::move(cols1));

EXPECT_THROW(cudf::inner_join(t0, t1, {0}, {0}), cudf::logic_error);
}

TEST_F(JoinTest, InnerJoinNoNulls)
{
column_wrapper<int32_t> col0_0{{3, 1, 2, 0, 2}};
Expand Down

0 comments on commit 8cba3b0

Please sign in to comment.