Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Avoid inserting null elements into join hash table when nulls are treated as unequal #6943

Merged
merged 19 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1e1086e
Initial pass at fixing nulls not equal performance with joins
hyperbolic2346 Dec 8, 2020
2020b4c
Merge remote-tracking branch 'upstream/branch-0.18' into mwilson/join…
hyperbolic2346 Dec 8, 2020
95f791f
adding changelog
hyperbolic2346 Dec 8, 2020
7db4f1b
Apply suggestions from code review
hyperbolic2346 Dec 8, 2020
539f461
removing stale comments
hyperbolic2346 Dec 9, 2020
cb88696
Merge branch 'mwilson/join_nulls' of github.com:hyperbolic2346/cudf i…
hyperbolic2346 Dec 9, 2020
8af46b2
attempting to clarify parameter comments
hyperbolic2346 Dec 9, 2020
e1ee434
moving device view creation up into build_join_hash_table and only pa…
hyperbolic2346 Dec 10, 2020
1e6461d
Merge remote-tracking branch 'upstream/branch-0.18' into mwilson/join…
hyperbolic2346 Dec 10, 2020
3d77c73
updating based on review
hyperbolic2346 Dec 10, 2020
e2f2810
Apply suggestions from code review
hyperbolic2346 Dec 10, 2020
9ab1e19
Adding srand per review comments
hyperbolic2346 Dec 10, 2020
6b0e5d4
linting
hyperbolic2346 Dec 10, 2020
2e233da
review comment changes
hyperbolic2346 Dec 10, 2020
5b66b19
Merge remote-tracking branch 'upstream/branch-0.18' into mwilson/join…
hyperbolic2346 Dec 10, 2020
e57855a
Apply suggestions from code review
hyperbolic2346 Dec 11, 2020
d300ad6
Merge remote-tracking branch 'upstream/branch-0.18' into mwilson/join…
hyperbolic2346 Dec 11, 2020
a458a84
cleanup and fixes for UniformRandomGenerator
hyperbolic2346 Dec 11, 2020
39403b2
linting
hyperbolic2346 Dec 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

- PR #6922 Fix N/A detection for empty fields in CSV reader
- PR #6912 Fix rmm_mode=managed parameter for gtests
- PR #6943 Fix join with nulls not equal performance
- PR #6945 Fix groupby agg/apply behaviour when no key columns are provided
- PR #6942 Fix cudf::merge gtest for dictionary columns

Expand Down
73 changes: 60 additions & 13 deletions cpp/benchmarks/join/join_benchmark.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>

#include <fixture/benchmark_fixture.hpp>
Expand All @@ -36,7 +37,7 @@ template <typename key_type, typename payload_type>
class Join : public cudf::benchmark {
};

template <typename key_type, typename payload_type>
template <typename key_type, typename payload_type, bool Nullable>
static void BM_join(benchmark::State &state)
{
const cudf::size_type build_table_size{(cudf::size_type)state.range(0)};
Expand All @@ -46,11 +47,33 @@ static void BM_join(benchmark::State &state)
const bool is_build_table_key_unique = true;

// Generate build and probe tables

auto build_key_column =
cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()), build_table_size);
auto probe_key_column =
cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()), probe_table_size);
cudf::test::UniformRandomGenerator<cudf::size_type> rand_gen(0, build_table_size);
auto build_random_null_mask = [&rand_gen](int size) {
if (Nullable) {
// roughly 25% nulls
auto validity = thrust::make_transform_iterator(
thrust::make_counting_iterator(0),
[&rand_gen](auto i) { return (rand_gen.generate() & 3) == 0; });
return cudf::test::detail::make_null_mask(validity, validity + size);
} else {
return cudf::create_null_mask(size, cudf::mask_state::UNINITIALIZED);
}
};

std::unique_ptr<cudf::column> build_key_column = [&]() {
return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
build_table_size,
build_random_null_mask(build_table_size))
: cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
build_table_size);
}();
std::unique_ptr<cudf::column> probe_key_column = [&]() {
return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
probe_table_size,
build_random_null_mask(probe_table_size))
: cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
probe_table_size);
}();

generate_input_tables<key_type, cudf::size_type>(
build_key_column->mutable_view().data<key_type>(),
Expand Down Expand Up @@ -82,17 +105,23 @@ static void BM_join(benchmark::State &state)
for (auto _ : state) {
cuda_event_timer raii(state, true, 0);

auto result =
cudf::inner_join(probe_table, build_table, columns_to_join, columns_to_join, {{0, 0}});
auto result = cudf::inner_join(probe_table,
build_table,
columns_to_join,
columns_to_join,
{{0, 0}},
cudf::null_equality::UNEQUAL);
}
}

#define JOIN_BENCHMARK_DEFINE(name, key_type, payload_type) \
BENCHMARK_TEMPLATE_DEFINE_F(Join, name, key_type, payload_type) \
(::benchmark::State & st) { BM_join<key_type, payload_type>(st); }
#define JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \
BENCHMARK_TEMPLATE_DEFINE_F(Join, name, key_type, payload_type) \
(::benchmark::State & st) { BM_join<key_type, payload_type, nullable>(st); }

JOIN_BENCHMARK_DEFINE(join_32bit, int32_t, int32_t);
JOIN_BENCHMARK_DEFINE(join_64bit, int64_t, int64_t);
JOIN_BENCHMARK_DEFINE(join_32bit, int32_t, int32_t, false);
JOIN_BENCHMARK_DEFINE(join_64bit, int64_t, int64_t, false);
JOIN_BENCHMARK_DEFINE(join_32bit_nulls, int32_t, int32_t, true);
JOIN_BENCHMARK_DEFINE(join_64bit_nulls, int64_t, int64_t, true);

BENCHMARK_REGISTER_F(Join, join_32bit)
->Unit(benchmark::kMillisecond)
Expand All @@ -111,3 +140,21 @@ BENCHMARK_REGISTER_F(Join, join_64bit)
->Args({50'000'000, 50'000'000})
->Args({40'000'000, 120'000'000})
->UseManualTime();

BENCHMARK_REGISTER_F(Join, join_32bit_nulls)
->Unit(benchmark::kMillisecond)
->Args({100'000, 100'000})
->Args({100'000, 400'000})
->Args({100'000, 1'000'000})
->Args({10'000'000, 10'000'000})
->Args({10'000'000, 40'000'000})
->Args({10'000'000, 100'000'000})
->Args({100'000'000, 100'000'000})
->Args({80'000'000, 240'000'000})
->UseManualTime();

BENCHMARK_REGISTER_F(Join, join_64bit_nulls)
->Unit(benchmark::kMillisecond)
->Args({50'000'000, 50'000'000})
->Args({40'000'000, 120'000'000})
->UseManualTime();
2 changes: 2 additions & 0 deletions cpp/include/cudf/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,12 @@ class hash_join {
*
* @param build The build table, from which the hash table is built.
* @param build_on The column indices from `build` to join on.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches
*/
hash_join(cudf::table_view const& build,
std::vector<size_type> const& build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
Expand Down
28 changes: 19 additions & 9 deletions cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,21 @@ get_left_join_indices_complement(rmm::device_vector<size_type> &right_indices,
* @throw std::out_of_range if elements of `build_on` exceed the number of columns in the `build`
* table.
*
* @param build_table Table of build side columns to join.
* @param build Table of columns used to build join hash.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
*
* @return Built hash table.
*/
std::unique_ptr<multimap_type, std::function<void(multimap_type *)>> build_join_hash_table(
cudf::table_device_view build_table, rmm::cuda_stream_view stream)
cudf::table_view const &build, null_equality compare_nulls, rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(0 != build_table.num_columns(), "Selected build dataset is empty");
CUDF_EXPECTS(0 != build_table.num_rows(), "Build side table has no rows");
auto build_device_table = cudf::table_device_view::create(build, stream);

const size_type build_table_num_rows{build_table.num_rows()};
CUDF_EXPECTS(0 != build_device_table->num_columns(), "Selected build dataset is empty");
CUDF_EXPECTS(0 != build_device_table->num_rows(), "Build side table has no rows");

size_type const build_table_num_rows{build_device_table->num_rows()};
size_t const hash_table_size = compute_hash_table_size(build_table_num_rows);

auto hash_table = multimap_type::create(hash_table_size,
Expand All @@ -219,12 +222,19 @@ std::unique_ptr<multimap_type, std::function<void(multimap_type *)>> build_join_
multimap_type::key_equal(),
multimap_type::allocator_type());

row_hash hash_build{build_table};
row_hash hash_build{*build_device_table};
rmm::device_scalar<int> failure(0, stream);
constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE};
detail::grid_1d config(build_table_num_rows, block_size);
auto const row_bitmask = (compare_nulls == null_equality::EQUAL)
? rmm::device_buffer{0, stream}
: cudf::detail::bitmask_and(build, stream);
build_hash_table<<<config.num_blocks, config.num_threads_per_block, 0, stream.value()>>>(
*hash_table, hash_build, build_table_num_rows, failure.data());
*hash_table,
hash_build,
build_table_num_rows,
static_cast<bitmask_type const *>(row_bitmask.data()),
failure.data());
// Check error code from the kernel
if (failure.value(stream) == 1) { CUDF_FAIL("Hash Table insert failure."); }

Expand Down Expand Up @@ -488,6 +498,7 @@ hash_join::hash_join_impl::~hash_join_impl() = default;

hash_join::hash_join_impl::hash_join_impl(cudf::table_view const &build,
std::vector<size_type> const &build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
: _build(build),
_build_selected(build.select(build_on)),
Expand All @@ -501,8 +512,7 @@ hash_join::hash_join_impl::hash_join_impl(cudf::table_view const &build,

if (_build_on.empty() || 0 == build.num_rows()) { return; }

auto build_table = cudf::table_device_view::create(_build_selected, stream);
_hash_table = build_join_hash_table(*build_table, stream);
_hash_table = build_join_hash_table(_build_selected, compare_nulls, stream);
}

std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>>
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/join/hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,11 @@ struct hash_join::hash_join_impl {
*
* @param build The build table, from which the hash table is built.
* @param build_on The column indices from `build` to join on.
* @param compare_nulls Controls whether null join-key values should match or not.
*/
hash_join_impl(cudf::table_view const& build,
std::vector<size_type> const& build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>> inner_join(
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/join/join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ std::unique_ptr<table> inner_join(
// building/probing the hash map. Because building is typically more expensive than probing, we
// build the hash map from the smaller table.
if (right.num_rows() > left.num_rows()) {
cudf::hash_join hj_obj(left, left_on, stream);
cudf::hash_join hj_obj(left, left_on, compare_nulls, stream);
auto actual_columns_in_common = columns_in_common;
std::for_each(actual_columns_in_common.begin(), actual_columns_in_common.end(), [](auto& pair) {
std::swap(pair.first, pair.second);
Expand All @@ -66,7 +66,7 @@ std::unique_ptr<table> inner_join(
return cudf::detail::combine_table_pair(std::move(probe_build_pair.second),
std::move(probe_build_pair.first));
} else {
cudf::hash_join hj_obj(right, right_on, stream);
cudf::hash_join hj_obj(right, right_on, compare_nulls, stream);
auto probe_build_pair = hj_obj.inner_join(left,
left_on,
columns_in_common,
Expand Down Expand Up @@ -99,7 +99,7 @@ std::unique_ptr<table> left_join(
table_view const left = scatter_columns(matched.second.front(), left_on, left_input);
table_view const right = scatter_columns(matched.second.back(), right_on, right_input);

cudf::hash_join hj_obj(right, right_on, stream);
cudf::hash_join hj_obj(right, right_on, compare_nulls, stream);
return hj_obj.left_join(left, left_on, columns_in_common, compare_nulls, stream, mr);
}

Expand All @@ -123,7 +123,7 @@ std::unique_ptr<table> full_join(
table_view const left = scatter_columns(matched.second.front(), left_on, left_input);
table_view const right = scatter_columns(matched.second.back(), right_on, right_input);

cudf::hash_join hj_obj(right, right_on, stream);
cudf::hash_join hj_obj(right, right_on, compare_nulls, stream);
return hj_obj.full_join(left, left_on, columns_in_common, compare_nulls, stream, mr);
}

Expand All @@ -133,8 +133,9 @@ hash_join::~hash_join() = default;

hash_join::hash_join(cudf::table_view const& build,
std::vector<size_type> const& build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
: impl{std::make_unique<const hash_join::hash_join_impl>(build, build_on, stream)}
: impl{std::make_unique<const hash_join::hash_join_impl>(build, build_on, compare_nulls, stream)}
{
}

Expand Down
28 changes: 17 additions & 11 deletions cpp/src/join/join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <cub/cub.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/table/table_device_view.cuh>

Expand Down Expand Up @@ -71,28 +72,33 @@ constexpr auto remap_sentinel_hash(H hash, S sentinel)
* @param[in,out] multi_map The hash table to be built to insert rows into
* @param[in] hash_build Row hasher for the build table
* @param[in] build_table_num_rows The number of rows in the build table
* @param[in] row_bitmask Bitmask where bit `i` indicates the presence of a null
* value in row `i` of input keys. This is nullptr if nulls are equal.
* @param[out] error Pointer used to set an error code if the insert fails
*/
template <typename multimap_type>
__global__ void build_hash_table(multimap_type multi_map,
row_hash hash_build,
const cudf::size_type build_table_num_rows,
bitmask_type const* row_bitmask,
int* error)
{
cudf::size_type i = threadIdx.x + blockIdx.x * blockDim.x;

while (i < build_table_num_rows) {
// Compute the hash value of this row
auto const row_hash_value = remap_sentinel_hash(hash_build(i), multi_map.get_unused_key());

// Insert the (row hash value, row index) into the map
// using the row hash value to determine the location in the
// hash map where the new pair should be inserted
const auto insert_location =
multi_map.insert(thrust::make_pair(row_hash_value, i), true, row_hash_value);

// If the insert failed, set the error code accordingly
if (multi_map.end() == insert_location) { *error = 1; }
if (!row_bitmask || cudf::bit_is_set(row_bitmask, i)) {
// Compute the hash value of this row
auto const row_hash_value = remap_sentinel_hash(hash_build(i), multi_map.get_unused_key());

// Insert the (row hash value, row index) into the map
// using the row hash value to determine the location in the
// hash map where the new pair should be inserted
auto const insert_location =
multi_map.insert(thrust::make_pair(row_hash_value, i), true, row_hash_value);

// If the insert failed, set the error code accordingly
if (multi_map.end() == insert_location) { *error = 1; }
}
i += blockDim.x * gridDim.x;
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/join/join_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ TEST_F(JoinTest, HashJoinSequentialProbes)

Table t1(std::move(cols1));

cudf::hash_join hash_join(t1, {0, 1});
cudf::hash_join hash_join(t1, {0, 1}, cudf::null_equality::EQUAL);

{
CVector cols0;
Expand Down