Skip to content

Commit

Permalink
Avoid inserting null elements into join hash table when nulls are tre…
Browse files Browse the repository at this point in the history
…ated as unequal(#6943)

This change mirrors what is done in `groupby` to eliminate null-containing columns from the join hash table if nulls not equal is set. This prevents absolute runaway of the process. I added benchmarks for joins with nulls and I can't even get it to finish without these changes. The 195ms test without nulls takes 2,000,000ms to complete and the larger tests I haven't had the patience to even see complete. With this change, the timings are faster than without nulls proportional to the % of nulls. Meaning half the table is nulls means the query is twice as fast as the non-null version, which makes sense.

closes #6052

Authors:
  - Mike Wilson <[email protected]>
  - Mike Wilson <[email protected]>

Approvers:
  - Jake Hemstad
  - Jake Hemstad
  - null
  - Mark Harris

URL: #6943
  • Loading branch information
hyperbolic2346 authored Dec 11, 2020
1 parent 4c26155 commit ab8c931
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

- PR #6922 Fix N/A detection for empty fields in CSV reader
- PR #6912 Fix rmm_mode=managed parameter for gtests
- PR #6943 Fix join with nulls not equal performance
- PR #6945 Fix groupby agg/apply behaviour when no key columns are provided
- PR #6942 Fix cudf::merge gtest for dictionary columns

Expand Down
73 changes: 60 additions & 13 deletions cpp/benchmarks/join/join_benchmark.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>

#include <fixture/benchmark_fixture.hpp>
Expand All @@ -36,7 +37,7 @@ template <typename key_type, typename payload_type>
class Join : public cudf::benchmark {
};

template <typename key_type, typename payload_type>
template <typename key_type, typename payload_type, bool Nullable>
static void BM_join(benchmark::State &state)
{
const cudf::size_type build_table_size{(cudf::size_type)state.range(0)};
Expand All @@ -46,11 +47,33 @@ static void BM_join(benchmark::State &state)
const bool is_build_table_key_unique = true;

// Generate build and probe tables

auto build_key_column =
cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()), build_table_size);
auto probe_key_column =
cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()), probe_table_size);
cudf::test::UniformRandomGenerator<cudf::size_type> rand_gen(0, build_table_size);
auto build_random_null_mask = [&rand_gen](int size) {
if (Nullable) {
// roughly 25% nulls
auto validity = thrust::make_transform_iterator(
thrust::make_counting_iterator(0),
[&rand_gen](auto i) { return (rand_gen.generate() & 3) == 0; });
return cudf::test::detail::make_null_mask(validity, validity + size);
} else {
return cudf::create_null_mask(size, cudf::mask_state::UNINITIALIZED);
}
};

std::unique_ptr<cudf::column> build_key_column = [&]() {
return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
build_table_size,
build_random_null_mask(build_table_size))
: cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
build_table_size);
}();
std::unique_ptr<cudf::column> probe_key_column = [&]() {
return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
probe_table_size,
build_random_null_mask(probe_table_size))
: cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
probe_table_size);
}();

generate_input_tables<key_type, cudf::size_type>(
build_key_column->mutable_view().data<key_type>(),
Expand Down Expand Up @@ -82,17 +105,23 @@ static void BM_join(benchmark::State &state)
for (auto _ : state) {
cuda_event_timer raii(state, true, 0);

auto result =
cudf::inner_join(probe_table, build_table, columns_to_join, columns_to_join, {{0, 0}});
auto result = cudf::inner_join(probe_table,
build_table,
columns_to_join,
columns_to_join,
{{0, 0}},
cudf::null_equality::UNEQUAL);
}
}

#define JOIN_BENCHMARK_DEFINE(name, key_type, payload_type) \
BENCHMARK_TEMPLATE_DEFINE_F(Join, name, key_type, payload_type) \
(::benchmark::State & st) { BM_join<key_type, payload_type>(st); }
#define JOIN_BENCHMARK_DEFINE(name, key_type, payload_type, nullable) \
BENCHMARK_TEMPLATE_DEFINE_F(Join, name, key_type, payload_type) \
(::benchmark::State & st) { BM_join<key_type, payload_type, nullable>(st); }

JOIN_BENCHMARK_DEFINE(join_32bit, int32_t, int32_t);
JOIN_BENCHMARK_DEFINE(join_64bit, int64_t, int64_t);
JOIN_BENCHMARK_DEFINE(join_32bit, int32_t, int32_t, false);
JOIN_BENCHMARK_DEFINE(join_64bit, int64_t, int64_t, false);
JOIN_BENCHMARK_DEFINE(join_32bit_nulls, int32_t, int32_t, true);
JOIN_BENCHMARK_DEFINE(join_64bit_nulls, int64_t, int64_t, true);

BENCHMARK_REGISTER_F(Join, join_32bit)
->Unit(benchmark::kMillisecond)
Expand All @@ -111,3 +140,21 @@ BENCHMARK_REGISTER_F(Join, join_64bit)
->Args({50'000'000, 50'000'000})
->Args({40'000'000, 120'000'000})
->UseManualTime();

BENCHMARK_REGISTER_F(Join, join_32bit_nulls)
->Unit(benchmark::kMillisecond)
->Args({100'000, 100'000})
->Args({100'000, 400'000})
->Args({100'000, 1'000'000})
->Args({10'000'000, 10'000'000})
->Args({10'000'000, 40'000'000})
->Args({10'000'000, 100'000'000})
->Args({100'000'000, 100'000'000})
->Args({80'000'000, 240'000'000})
->UseManualTime();

BENCHMARK_REGISTER_F(Join, join_64bit_nulls)
->Unit(benchmark::kMillisecond)
->Args({50'000'000, 50'000'000})
->Args({40'000'000, 120'000'000})
->UseManualTime();
2 changes: 2 additions & 0 deletions cpp/include/cudf/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,12 @@ class hash_join {
*
* @param build The build table, from which the hash table is built.
* @param build_on The column indices from `build` to join on.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches
*/
hash_join(cudf::table_view const& build,
std::vector<size_type> const& build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
Expand Down
28 changes: 19 additions & 9 deletions cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,21 @@ get_left_join_indices_complement(rmm::device_vector<size_type> &right_indices,
* @throw std::out_of_range if elements of `build_on` exceed the number of columns in the `build`
* table.
*
* @param build_table Table of build side columns to join.
* @param build Table of columns used to build join hash.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
*
* @return Built hash table.
*/
std::unique_ptr<multimap_type, std::function<void(multimap_type *)>> build_join_hash_table(
cudf::table_device_view build_table, rmm::cuda_stream_view stream)
cudf::table_view const &build, null_equality compare_nulls, rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(0 != build_table.num_columns(), "Selected build dataset is empty");
CUDF_EXPECTS(0 != build_table.num_rows(), "Build side table has no rows");
auto build_device_table = cudf::table_device_view::create(build, stream);

const size_type build_table_num_rows{build_table.num_rows()};
CUDF_EXPECTS(0 != build_device_table->num_columns(), "Selected build dataset is empty");
CUDF_EXPECTS(0 != build_device_table->num_rows(), "Build side table has no rows");

size_type const build_table_num_rows{build_device_table->num_rows()};
size_t const hash_table_size = compute_hash_table_size(build_table_num_rows);

auto hash_table = multimap_type::create(hash_table_size,
Expand All @@ -219,12 +222,19 @@ std::unique_ptr<multimap_type, std::function<void(multimap_type *)>> build_join_
multimap_type::key_equal(),
multimap_type::allocator_type());

row_hash hash_build{build_table};
row_hash hash_build{*build_device_table};
rmm::device_scalar<int> failure(0, stream);
constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE};
detail::grid_1d config(build_table_num_rows, block_size);
auto const row_bitmask = (compare_nulls == null_equality::EQUAL)
? rmm::device_buffer{0, stream}
: cudf::detail::bitmask_and(build, stream);
build_hash_table<<<config.num_blocks, config.num_threads_per_block, 0, stream.value()>>>(
*hash_table, hash_build, build_table_num_rows, failure.data());
*hash_table,
hash_build,
build_table_num_rows,
static_cast<bitmask_type const *>(row_bitmask.data()),
failure.data());
// Check error code from the kernel
if (failure.value(stream) == 1) { CUDF_FAIL("Hash Table insert failure."); }

Expand Down Expand Up @@ -488,6 +498,7 @@ hash_join::hash_join_impl::~hash_join_impl() = default;

hash_join::hash_join_impl::hash_join_impl(cudf::table_view const &build,
std::vector<size_type> const &build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
: _build(build),
_build_selected(build.select(build_on)),
Expand All @@ -501,8 +512,7 @@ hash_join::hash_join_impl::hash_join_impl(cudf::table_view const &build,

if (_build_on.empty() || 0 == build.num_rows()) { return; }

auto build_table = cudf::table_device_view::create(_build_selected, stream);
_hash_table = build_join_hash_table(*build_table, stream);
_hash_table = build_join_hash_table(_build_selected, compare_nulls, stream);
}

std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>>
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/join/hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,11 @@ struct hash_join::hash_join_impl {
*
* @param build The build table, from which the hash table is built.
* @param build_on The column indices from `build` to join on.
* @param compare_nulls Controls whether null join-key values should match or not.
*/
hash_join_impl(cudf::table_view const& build,
std::vector<size_type> const& build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>> inner_join(
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/join/join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ std::unique_ptr<table> inner_join(
// building/probing the hash map. Because building is typically more expensive than probing, we
// build the hash map from the smaller table.
if (right.num_rows() > left.num_rows()) {
cudf::hash_join hj_obj(left, left_on, stream);
cudf::hash_join hj_obj(left, left_on, compare_nulls, stream);
auto actual_columns_in_common = columns_in_common;
std::for_each(actual_columns_in_common.begin(), actual_columns_in_common.end(), [](auto& pair) {
std::swap(pair.first, pair.second);
Expand All @@ -66,7 +66,7 @@ std::unique_ptr<table> inner_join(
return cudf::detail::combine_table_pair(std::move(probe_build_pair.second),
std::move(probe_build_pair.first));
} else {
cudf::hash_join hj_obj(right, right_on, stream);
cudf::hash_join hj_obj(right, right_on, compare_nulls, stream);
auto probe_build_pair = hj_obj.inner_join(left,
left_on,
columns_in_common,
Expand Down Expand Up @@ -99,7 +99,7 @@ std::unique_ptr<table> left_join(
table_view const left = scatter_columns(matched.second.front(), left_on, left_input);
table_view const right = scatter_columns(matched.second.back(), right_on, right_input);

cudf::hash_join hj_obj(right, right_on, stream);
cudf::hash_join hj_obj(right, right_on, compare_nulls, stream);
return hj_obj.left_join(left, left_on, columns_in_common, compare_nulls, stream, mr);
}

Expand All @@ -123,7 +123,7 @@ std::unique_ptr<table> full_join(
table_view const left = scatter_columns(matched.second.front(), left_on, left_input);
table_view const right = scatter_columns(matched.second.back(), right_on, right_input);

cudf::hash_join hj_obj(right, right_on, stream);
cudf::hash_join hj_obj(right, right_on, compare_nulls, stream);
return hj_obj.full_join(left, left_on, columns_in_common, compare_nulls, stream, mr);
}

Expand All @@ -133,8 +133,9 @@ hash_join::~hash_join() = default;

hash_join::hash_join(cudf::table_view const& build,
std::vector<size_type> const& build_on,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
: impl{std::make_unique<const hash_join::hash_join_impl>(build, build_on, stream)}
: impl{std::make_unique<const hash_join::hash_join_impl>(build, build_on, compare_nulls, stream)}
{
}

Expand Down
28 changes: 17 additions & 11 deletions cpp/src/join/join_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <cub/cub.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/table/table_device_view.cuh>

Expand Down Expand Up @@ -71,28 +72,33 @@ constexpr auto remap_sentinel_hash(H hash, S sentinel)
* @param[in,out] multi_map The hash table to be built to insert rows into
* @param[in] hash_build Row hasher for the build table
* @param[in] build_table_num_rows The number of rows in the build table
* @param[in] row_bitmask Bitmask where bit `i` indicates the presence of a null
* value in row `i` of input keys. This is nullptr if nulls are equal.
* @param[out] error Pointer used to set an error code if the insert fails
*/
template <typename multimap_type>
__global__ void build_hash_table(multimap_type multi_map,
row_hash hash_build,
const cudf::size_type build_table_num_rows,
bitmask_type const* row_bitmask,
int* error)
{
cudf::size_type i = threadIdx.x + blockIdx.x * blockDim.x;

while (i < build_table_num_rows) {
// Compute the hash value of this row
auto const row_hash_value = remap_sentinel_hash(hash_build(i), multi_map.get_unused_key());

// Insert the (row hash value, row index) into the map
// using the row hash value to determine the location in the
// hash map where the new pair should be inserted
const auto insert_location =
multi_map.insert(thrust::make_pair(row_hash_value, i), true, row_hash_value);

// If the insert failed, set the error code accordingly
if (multi_map.end() == insert_location) { *error = 1; }
if (!row_bitmask || cudf::bit_is_set(row_bitmask, i)) {
// Compute the hash value of this row
auto const row_hash_value = remap_sentinel_hash(hash_build(i), multi_map.get_unused_key());

// Insert the (row hash value, row index) into the map
// using the row hash value to determine the location in the
// hash map where the new pair should be inserted
auto const insert_location =
multi_map.insert(thrust::make_pair(row_hash_value, i), true, row_hash_value);

// If the insert failed, set the error code accordingly
if (multi_map.end() == insert_location) { *error = 1; }
}
i += blockDim.x * gridDim.x;
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/join/join_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ TEST_F(JoinTest, HashJoinSequentialProbes)

Table t1(std::move(cols1));

cudf::hash_join hash_join(t1, {0, 1});
cudf::hash_join hash_join(t1, {0, 1}, cudf::null_equality::EQUAL);

{
CVector cols0;
Expand Down

0 comments on commit ab8c931

Please sign in to comment.