Skip to content

Commit

Permalink
Replace cudf's concurrent_ordered_map with cuco::static_map in semi/a…
Browse files Browse the repository at this point in the history
…nti joins (#9666)

This PR resolves #9586, replacing the hash table used in semi and anti joins with cuco::static_map. It depends on NVIDIA/cuCollections#118. At present the code is slower than the original version, so we'll probably want to make some optimizations in cuco before merging this.

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Conor Hoekstra (https://github.com/codereport)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #9666
  • Loading branch information
vyasr authored Jan 5, 2022
1 parent 3e893a6 commit 2112757
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 60 deletions.
2 changes: 1 addition & 1 deletion cpp/cmake/thirdparty/get_cucollections.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function(find_and_configure_cucollections)
cuco 0.0
GLOBAL_TARGETS cuco::cuco
CPM_ARGS GITHUB_REPOSITORY NVIDIA/cuCollections
GIT_TAG 6433e8ad7571f14cc5384051b049029c60dd1ce0
GIT_TAG 193de1aa74f5721717f991ca757dc610c852bb17
OPTIONS "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" "BUILD_EXAMPLES OFF"
)

Expand Down
16 changes: 0 additions & 16 deletions cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,6 @@ namespace detail {

namespace {

/**
* @brief Device functor to determine if a row is valid.
*/
class row_is_valid {
public:
row_is_valid(bitmask_type const* row_bitmask) : _row_bitmask{row_bitmask} {}

__device__ __inline__ bool operator()(const size_type& i) const noexcept
{
return cudf::bit_is_set(_row_bitmask, i);
}

private:
bitmask_type const* _row_bitmask;
};

} // anonymous namespace

std::pair<std::unique_ptr<table>, std::unique_ptr<table>> get_empty_joined_table(
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@
namespace cudf {
namespace detail {

/**
* @brief Device functor to determine if a row is valid.
*/
class row_is_valid {
public:
row_is_valid(bitmask_type const* row_bitmask) : _row_bitmask{row_bitmask} {}

__device__ __inline__ bool operator()(const size_type& i) const noexcept
{
return cudf::bit_is_set(_row_bitmask, i);
}

private:
bitmask_type const* _row_bitmask;
};

/**
* @brief Device functor to determine if two pairs are identical.
*/
Expand Down
105 changes: 62 additions & 43 deletions cpp/src/join/semi_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

#include <hash/concurrent_unordered_map.cuh>
#include <join/join_common_utils.cuh>
#include <join/join_common_utils.hpp>

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/structs/utilities.hpp>
Expand All @@ -34,10 +35,28 @@
#include <thrust/copy.h>
#include <thrust/distance.h>
#include <thrust/sequence.h>
#include <thrust/tuple.h>

#include <cuco/static_map.cuh>

namespace cudf {
namespace detail {

namespace {
/**
* @brief Device functor to create a pair of hash value and index for a given row.
*/
struct make_pair_function {
__device__ __forceinline__ cudf::detail::pair_type operator()(size_type i) const noexcept
{
// The value is irrelevant since we only ever use the hash map to check for
// membership of a particular row index.
return cuco::make_pair<hash_value_type, size_type>(i, 0);
}
};

} // namespace

std::unique_ptr<rmm::device_uvector<cudf::size_type>> left_semi_anti_join(
join_kind const kind,
cudf::table_view const& left_keys,
Expand Down Expand Up @@ -71,67 +90,67 @@ std::unique_ptr<rmm::device_uvector<cudf::size_type>> left_semi_anti_join(
auto right_flattened_keys = right_flattened_tables.flattened_columns();
auto left_flattened_keys = left_flattened_tables.flattened_columns();

// Only care about existence, so we'll use an unordered map (other joins need a multimap)
using hash_table_type = concurrent_unordered_map<cudf::size_type, bool, row_hash, row_equality>;
// Create hash table.
auto hash_table = cuco::
static_map<hash_value_type, size_type, cuda::thread_scope_device, hash_table_allocator_type>{
compute_hash_table_size(right_num_rows),
std::numeric_limits<hash_value_type>::max(),
cudf::detail::JoinNoneValue,
hash_table_allocator_type{default_allocator<char>{}, stream},
stream.value()};

// Create hash table containing all keys found in right table
auto right_rows_d = table_device_view::create(right_flattened_keys, stream);
size_t const hash_table_size = compute_hash_table_size(right_num_rows);
auto const right_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(right_flattened_keys)};
row_hash hash_build{right_nulls, *right_rows_d};
auto right_rows_d = table_device_view::create(right_flattened_keys, stream);
auto const right_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(right_flattened_keys)};
row_hash const hash_build{right_nulls, *right_rows_d};
row_equality equality_build{right_nulls, *right_rows_d, *right_rows_d, compare_nulls};
make_pair_function pair_func_build{};

auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func_build);

// Going to join it with left table
auto left_rows_d = table_device_view::create(left_flattened_keys, stream);
auto const left_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(left_flattened_keys)};
row_hash hash_probe{left_nulls, *left_rows_d};
row_equality equality_probe{left_nulls, *left_rows_d, *right_rows_d, compare_nulls};

auto hash_table_ptr = hash_table_type::create(hash_table_size,
stream,
std::numeric_limits<bool>::max(),
std::numeric_limits<cudf::size_type>::max(),
hash_build,
equality_build);
auto hash_table = *hash_table_ptr;

// if compare_nulls == UNEQUAL, we can simply ignore any rows that
// contain a NULL in any column as they will never compare to equal.
auto const row_bitmask = (compare_nulls == null_equality::EQUAL)
? rmm::device_buffer{}
: cudf::detail::bitmask_and(right_flattened_keys, stream).first;
// skip rows that are null here.
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
right_num_rows,
[hash_table, row_bitmask = static_cast<bitmask_type const*>(row_bitmask.data())] __device__(
size_type idx) mutable {
if (!row_bitmask || cudf::bit_is_set(row_bitmask, idx)) {
hash_table.insert(thrust::make_pair(idx, true));
}
});
if ((compare_nulls == null_equality::EQUAL) or (not nullable(right_keys))) {
hash_table.insert(iter, iter + right_num_rows, hash_build, equality_build, stream.value());
} else {
thrust::counting_iterator<size_type> stencil(0);
auto const [row_bitmask, _] = cudf::detail::bitmask_and(right_flattened_keys, stream);
row_is_valid pred{static_cast<bitmask_type const*>(row_bitmask.data())};

// insert valid rows
hash_table.insert_if(
iter, iter + right_num_rows, stencil, pred, hash_build, equality_build, stream.value());
}

//
// Now we have a hash table, we need to iterate over the rows of the left table
// and check to see if they are contained in the hash table
//
auto left_rows_d = table_device_view::create(left_flattened_keys, stream);
auto const left_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(left_flattened_keys)};
row_hash hash_probe{left_nulls, *left_rows_d};
// Note: This equality comparator violates symmetry of equality and is
// therefore relying on the implementation detail of the order in which its
// operator is invoked. If cuco makes no promises about the order of
// invocation this seems a bit unsafe.
row_equality equality_probe{left_nulls, *right_rows_d, *left_rows_d, compare_nulls};

// For semi join we want contains to be true, for anti join we want contains to be false
bool const join_type_boolean = (kind == join_kind::LEFT_SEMI_JOIN);

auto hash_table_view = hash_table.get_device_view();

auto gather_map =
std::make_unique<rmm::device_uvector<cudf::size_type>>(left_num_rows, stream, mr);

// gather_map_end will be the end of valid data in gather_map
auto gather_map_end = thrust::copy_if(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(left_num_rows),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(left_num_rows),
gather_map->begin(),
[hash_table, join_type_boolean, hash_probe, equality_probe] __device__(size_type idx) {
auto pos = hash_table.find(idx, hash_probe, equality_probe);
return (pos != hash_table.end()) == join_type_boolean;
[hash_table_view, join_type_boolean, hash_probe, equality_probe] __device__(
size_type const idx) {
// Look up this row. The hash function used here needs to map a (left) row index to the hash
// of the row, so it's a row hash. The equality check needs to verify
return hash_table_view.contains(idx, hash_probe, equality_probe) == join_type_boolean;
});

auto join_size = thrust::distance(gather_map->begin(), gather_map_end);
Expand Down
15 changes: 15 additions & 0 deletions cpp/tests/join/semi_anti_join_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ using Table = cudf::table;
struct JoinTest : public cudf::test::BaseFixture {
};

TEST_F(JoinTest, TestSimple)
{
column_wrapper<int32_t> left_col0{0, 1, 2};
column_wrapper<int32_t> right_col0{0, 1, 3};

auto left = cudf::table_view{{left_col0}};
auto right = cudf::table_view{{right_col0}};

auto result = cudf::left_semi_join(left, right);
auto result_cv = cudf::column_view(
cudf::data_type{cudf::type_to_id<cudf::size_type>()}, result->size(), result->data());
column_wrapper<cudf::size_type> expected{0, 1};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result_cv);
};

std::pair<std::unique_ptr<cudf::table>, std::unique_ptr<cudf::table>> get_saj_tables(
std::vector<bool> const& left_is_human_nulls, std::vector<bool> const& right_is_human_nulls)
{
Expand Down

0 comments on commit 2112757

Please sign in to comment.