Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add row bitmask as a detail::hash_join member #10248

Merged
merged 18 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/benchmarks/join/join_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,27 @@ static void BM_join(state_type& state, Join JoinFunc)
[[maybe_unused]] std::vector<cudf::size_type> columns_to_join = {0};

// Benchmark the inner join operation
auto mem_stats_logger = cudf::memory_stats_logger();
if constexpr (std::is_same_v<state_type, benchmark::State> and (not is_conditional)) {
for (auto _ : state) {
cuda_event_timer raii(state, true, rmm::cuda_stream_default);

auto result = JoinFunc(
probe_table, build_table, columns_to_join, columns_to_join, cudf::null_equality::UNEQUAL);
}
state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage();
}
if constexpr (std::is_same_v<state_type, nvbench::state> and (not is_conditional)) {
state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
auto result = JoinFunc(probe_table,
build_table,
auto result = JoinFunc(build_table,
probe_table,
columns_to_join,
columns_to_join,
cudf::null_equality::UNEQUAL,
stream_view);
});
state.add_element_count(mem_stats_logger.peak_memory_usage(), "Peak Memory");
}

// Benchmark conditional join
Expand Down
84 changes: 75 additions & 9 deletions cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,70 @@ std::pair<std::unique_ptr<table>, std::unique_ptr<table>> get_empty_joined_table
return std::make_pair(std::move(empty_probe), std::move(empty_build));
}

/**
* @brief Performs bitwise AND of the bitmasks of columns of `build`. Returns
* a helper struct of row bitmasks and count of valid rows.
*
* @param build Table of columns used to build join hash.
* @param nulls_equal Flag to denote nulls are equal or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @return A helper struct of row bitmask and count of valid rows.
*/
cudf::detail::row_info get_valid_row_info(cudf::table_view const& build,
null_equality const nulls_equal,
rmm::cuda_stream_view stream)
{
cudf::detail::row_info result;
result.num_valid_rows = build.num_rows();
auto [row_bitmask, null_count] = cudf::detail::bitmask_and(build, stream);
if (nulls_equal == cudf::null_equality::UNEQUAL and nullable(build)) {
result.num_valid_rows -= null_count;
}
result.composite_bitmask = std::move(row_bitmask);
return result;
}
PointKernel marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Fills the hash table based on the given table `build`.
*
* @tparam MultimapType Type of the hash table
*
* @param build Table of columns used to build join hash.
* @param hash_table Build hash table.
* @param nulls_equal Flag to denote nulls are equal or not.
* @param row_bitmask Bitmasks to denote whether a row is valid or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
*
*/
template <typename MultimapType>
void fill_join_hash_table(cudf::table_view const& build,
MultimapType& hash_table,
null_equality const nulls_equal,
rmm::device_buffer const& row_bitmask,
rmm::cuda_stream_view stream)
{
auto build_table_ptr = cudf::table_device_view::create(build, stream);
auto const build_table_num_rows = build.num_rows();

CUDF_EXPECTS(0 != build_table_ptr->num_columns(), "Selected build dataset is empty");
CUDF_EXPECTS(0 != build_table_num_rows, "Build side table has no rows");

row_hash hash_build{nullate::DYNAMIC{cudf::has_nulls(build)}, *build_table_ptr};
make_pair_function pair_func{hash_build, hash_table.get_empty_key_sentinel()};

auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func);

if (nulls_equal == cudf::null_equality::EQUAL or (not nullable(build))) {
hash_table.insert(iter, iter + build_table_num_rows, stream.value());
} else {
thrust::counting_iterator<size_type> stencil(0);
row_is_valid pred{static_cast<bitmask_type const*>(row_bitmask.data())};

// insert valid rows
hash_table.insert_if(iter, iter + build_table_num_rows, stencil, pred, stream.value());
}
}

/**
* @brief Probes the `hash_table` built from `build_table` for tuples in `probe_table`,
* and returns the output indices of `build_table` and `probe_table` as a combined table.
Expand Down Expand Up @@ -234,9 +298,9 @@ hash_join::hash_join_impl::~hash_join_impl() = default;
hash_join::hash_join_impl::hash_join_impl(cudf::table_view const& build,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
: _is_empty{build.num_rows() == 0},
: _valid_row_info{cudf::detail::get_valid_row_info(build, compare_nulls, stream)},
_nulls_equal{compare_nulls},
_hash_table{compute_hash_table_size(build.num_rows()),
_hash_table{compute_hash_table_size(_valid_row_info.num_valid_rows),
std::numeric_limits<hash_value_type>::max(),
cudf::detail::JoinNoneValue,
stream.value(),
Expand All @@ -252,9 +316,11 @@ hash_join::hash_join_impl::hash_join_impl(cudf::table_view const& build,
build, {}, {}, structs::detail::column_nullability::FORCE);
_build = _flattened_build_table;

if (_is_empty) { return; }
// skip filling if empty
if (_valid_row_info.num_valid_rows == 0) { return; }

cudf::detail::build_join_hash_table(_build, _hash_table, _nulls_equal, stream);
cudf::detail::fill_join_hash_table(
_build, _hash_table, _nulls_equal, _valid_row_info.composite_bitmask, stream);
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
Expand Down Expand Up @@ -296,7 +362,7 @@ std::size_t hash_join::hash_join_impl::inner_join_size(cudf::table_view const& p
CUDF_FUNC_RANGE();

// Return directly if build table is empty
if (_is_empty) { return 0; }
if (_valid_row_info.num_valid_rows == 0) { return 0; }

auto flattened_probe = structs::detail::flatten_nested_columns(
probe, {}, {}, structs::detail::column_nullability::FORCE);
Expand All @@ -320,7 +386,7 @@ std::size_t hash_join::hash_join_impl::left_join_size(cudf::table_view const& pr
CUDF_FUNC_RANGE();

// Trivial left join case - exit early
if (_is_empty) { return probe.num_rows(); }
if (_valid_row_info.num_valid_rows == 0) { return probe.num_rows(); }

auto flattened_probe = structs::detail::flatten_nested_columns(
probe, {}, {}, structs::detail::column_nullability::FORCE);
Expand All @@ -345,7 +411,7 @@ std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const& pr
CUDF_FUNC_RANGE();

// Trivial left join case - exit early
if (_is_empty) { return probe.num_rows(); }
if (_valid_row_info.num_valid_rows == 0) { return probe.num_rows(); }

auto flattened_probe = structs::detail::flatten_nested_columns(
probe, {}, {}, structs::detail::column_nullability::FORCE);
Expand Down Expand Up @@ -407,11 +473,11 @@ hash_join::hash_join_impl::probe_join_indices(cudf::table_view const& probe_tabl
rmm::mr::device_memory_resource* mr) const
{
// Trivial left join case - exit early
if (_is_empty and JoinKind != cudf::detail::join_kind::INNER_JOIN) {
if (_valid_row_info.num_valid_rows == 0 and JoinKind != cudf::detail::join_kind::INNER_JOIN) {
return get_trivial_left_join_indices(probe_table, stream, mr);
}

CUDF_EXPECTS(!_is_empty, "Hash table of hash join is null.");
CUDF_EXPECTS(_valid_row_info.num_valid_rows > 0, "Hash table of hash join is null.");

auto build_table_ptr = cudf::table_device_view::create(_build, stream);
auto probe_table_ptr = cudf::table_device_view::create(probe_table, stream);
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/join/hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
namespace cudf {
namespace detail {

struct row_info {
rmm::device_buffer composite_bitmask;
cudf::size_type num_valid_rows;
};

/**
* @brief Remaps a hash value to a new value if it is equal to the specified sentinel value.
*
Expand Down Expand Up @@ -163,17 +168,17 @@ void build_join_hash_table(cudf::table_view const& build,
rmm::cuda_stream_view stream)
{
auto build_table_ptr = cudf::table_device_view::create(build, stream);
auto const build_table_num_rows{build_table_ptr->num_rows()};

CUDF_EXPECTS(0 != build_table_ptr->num_columns(), "Selected build dataset is empty");
CUDF_EXPECTS(0 != build_table_ptr->num_rows(), "Build side table has no rows");
CUDF_EXPECTS(0 != build_table_num_rows, "Build side table has no rows");

row_hash hash_build{nullate::DYNAMIC{cudf::has_nulls(build)}, *build_table_ptr};
auto const empty_key_sentinel = hash_table.get_empty_key_sentinel();
make_pair_function pair_func{hash_build, empty_key_sentinel};

auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func);

size_type const build_table_num_rows{build_table_ptr->num_rows()};
if (nulls_equal == cudf::null_equality::EQUAL or (not nullable(build))) {
hash_table.insert(iter, iter + build_table_num_rows, stream.value());
} else {
Expand All @@ -197,7 +202,7 @@ struct hash_join::hash_join_impl {
hash_join_impl& operator=(hash_join_impl&&) = delete;

private:
bool const _is_empty;
cudf::detail::row_info const _valid_row_info;
cudf::null_equality const _nulls_equal;
cudf::table_view _build;
std::vector<std::unique_ptr<cudf::column>> _created_null_columns;
Expand Down