diff --git a/cpp/include/cudf/detail/join.hpp b/cpp/include/cudf/detail/join.hpp index e539b1a34c8..ce32be59983 100644 --- a/cpp/include/cudf/detail/join.hpp +++ b/cpp/include/cudf/detail/join.hpp @@ -36,8 +36,8 @@ template class default_allocator; -namespace cudf::structs::detail { -class flattened_table; +namespace cudf::experimental::row::equality { +class preprocessed_table; } namespace cudf { @@ -77,9 +77,9 @@ struct hash_join { rmm::device_buffer const _composite_bitmask; ///< Bitmask to denote whether a row is valid cudf::null_equality const _nulls_equal; ///< whether to consider nulls as equal cudf::table_view _build; ///< input table to build the hash map - std::unique_ptr - _flattened_build_table; ///< flattened data structures for `_build` - map_type _hash_table; ///< hash table built on `_build` + std::shared_ptr + _preprocessed_build; ///< input table preprocssed for row operators + map_type _hash_table; ///< hash table built on `_build` public: /** @@ -152,21 +152,20 @@ struct hash_join { * i.e. if full join is specified as the join type then left join is called. Behavior * is undefined if the provided `output_size` is smaller than the actual output size. * - * @throw cudf::logic_error if build table is empty and `JoinKind == INNER_JOIN`. - * - * @tparam JoinKind The type of join to be performed. + * @throw cudf::logic_error if build table is empty and `join == INNER_JOIN`. * * @param probe_table Table of probe side columns to join. + * @param join The type of join to be performed. * @param output_size Optional value which allows users to specify the exact output size. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned vectors. * * @return Join output indices vector pair. */ - template std::pair>, std::unique_ptr>> probe_join_indices(cudf::table_view const& probe_table, + join_kind join, std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const; @@ -179,10 +178,10 @@ struct hash_join { * @throw cudf::logic_error if the number of columns in build table and probe table do not match. * @throw cudf::logic_error if the column data types in build table and probe table do not match. */ - template std::pair>, std::unique_ptr>> compute_hash_join(cudf::table_view const& probe, + join_kind join, std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const; diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index d0bdad73614..d6652bd7a0c 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -47,26 +47,33 @@ namespace { * @brief Calculates the exact size of the join output produced when * joining two tables together. * - * @throw cudf::logic_error if JoinKind is not INNER_JOIN or LEFT_JOIN - * - * @tparam JoinKind The type of join to be performed + * @throw cudf::logic_error if join is not INNER_JOIN or LEFT_JOIN * * @param build_table The right hand table * @param probe_table The left hand table + * @param preprocessed_build shared_ptr to cudf::experimental::row::equality::preprocessed_table for + * build_table + * @param preprocessed_probe shared_ptr to cudf::experimental::row::equality::preprocessed_table for + * probe_table * @param hash_table A hash table built on the build table that maps the index - * of every row to the hash value of that row. - * @param nulls_equal Flag to denote nulls are equal or not. + * of every row to the hash value of that row + * @param join The type of join to be performed + * @param has_nulls Flag to denote if build or probe tables have nested nulls + * @param nulls_equal Flag to denote nulls are equal or not * @param stream CUDA stream used for device memory operations and kernel launches * * @return The exact size of the output of the join operation */ -template -std::size_t compute_join_output_size(table_device_view build_table, - table_device_view probe_table, - cudf::detail::multimap_type const& hash_table, - bool const has_nulls, - cudf::null_equality const nulls_equal, - rmm::cuda_stream_view stream) +std::size_t compute_join_output_size( + table_view const& build_table, + table_view const& probe_table, + std::shared_ptr const& preprocessed_build, + std::shared_ptr const& preprocessed_probe, + cudf::detail::multimap_type const& hash_table, + join_kind join, + bool has_nulls, + cudf::null_equality nulls_equal, + rmm::cuda_stream_view stream) { const size_type build_table_num_rows{build_table.num_rows()}; const size_type probe_table_num_rows{probe_table.num_rows()}; @@ -74,7 +81,7 @@ std::size_t compute_join_output_size(table_device_view build_table, // If the build table is empty, we know exactly how large the output // will be for the different types of joins and can return immediately if (0 == build_table_num_rows) { - switch (JoinKind) { + switch (join) { // Inner join with an empty table will have no output case join_kind::INNER_JOIN: return 0; @@ -87,22 +94,33 @@ std::size_t compute_join_output_size(table_device_view build_table, } auto const probe_nulls = cudf::nullate::DYNAMIC{has_nulls}; - pair_equality equality{probe_table, build_table, probe_nulls, nulls_equal}; - row_hash hash_probe{probe_nulls, probe_table}; + auto const row_hash = cudf::experimental::row::hash::row_hasher{preprocessed_probe}; + auto const hash_probe = row_hash.device_hasher(probe_nulls); auto const empty_key_sentinel = hash_table.get_empty_key_sentinel(); - make_pair_function pair_func{hash_probe, empty_key_sentinel}; - - auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func); + auto const iter = cudf::detail::make_counting_transform_iterator( + 0, make_pair_function{hash_probe, empty_key_sentinel}); + + auto const row_comparator = + cudf::experimental::row::equality::two_table_comparator{preprocessed_probe, preprocessed_build}; + auto const comparator_helper = [&](auto device_comparator) { + pair_equality equality{device_comparator}; + + if (join == join_kind::LEFT_JOIN) { + return hash_table.pair_count_outer( + iter, iter + probe_table_num_rows, equality, stream.value()); + } else { + return hash_table.pair_count(iter, iter + probe_table_num_rows, equality, stream.value()); + } + }; - std::size_t size; - if constexpr (JoinKind == join_kind::LEFT_JOIN) { - size = hash_table.pair_count_outer(iter, iter + probe_table_num_rows, equality, stream.value()); + if (cudf::detail::has_nested_columns(probe_table)) { + auto const device_comparator = row_comparator.equal_to(has_nulls, nulls_equal); + return comparator_helper(device_comparator); } else { - size = hash_table.pair_count(iter, iter + probe_table_num_rows, equality, stream.value()); + auto const device_comparator = row_comparator.equal_to(has_nulls, nulls_equal); + return comparator_helper(device_comparator); } - - return size; } /** @@ -110,39 +128,51 @@ std::size_t compute_join_output_size(table_device_view build_table, * and returns the output indices of `build_table` and `probe_table` as a combined table. * Behavior is undefined if the provided `output_size` is smaller than the actual output size. * - * @tparam JoinKind The type of join to be performed. - * - * @param build_table Table of build side columns to join. - * @param probe_table Table of probe side columns to join. - * @param hash_table Hash table built from `build_table`. - * @param compare_nulls Controls whether null join-key values should match or not. - * @param output_size Optional value which allows users to specify the exact output size. - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned vectors. + * @param build_table Table of build side columns to join + * @param probe_table Table of probe side columns to join + * @param preprocessed_build shared_ptr to cudf::experimental::row::equality::preprocessed_table for + * build_table + * @param preprocessed_probe shared_ptr to cudf::experimental::row::equality::preprocessed_table for + * probe_table + * @param hash_table Hash table built from `build_table` + * @param join The type of join to be performed + * @param has_nulls Flag to denote if build or probe tables have nested nulls + * @param compare_nulls Controls whether null join-key values should match or not + * @param output_size Optional value which allows users to specify the exact output size + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned vectors * * @return Join output indices vector pair. */ -template std::pair>, std::unique_ptr>> -probe_join_hash_table(cudf::table_device_view build_table, - cudf::table_device_view probe_table, - cudf::detail::multimap_type const& hash_table, - bool has_nulls, - null_equality compare_nulls, - std::optional output_size, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +probe_join_hash_table( + cudf::table_view const& build_table, + cudf::table_view const& probe_table, + std::shared_ptr const& preprocessed_build, + std::shared_ptr const& preprocessed_probe, + cudf::detail::multimap_type const& hash_table, + join_kind join, + bool has_nulls, + null_equality compare_nulls, + std::optional output_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { // Use the output size directly if provided. Otherwise, compute the exact output size - constexpr cudf::detail::join_kind ProbeJoinKind = (JoinKind == cudf::detail::join_kind::FULL_JOIN) - ? cudf::detail::join_kind::LEFT_JOIN - : JoinKind; - - std::size_t const join_size = - output_size ? *output_size - : compute_join_output_size( - build_table, probe_table, hash_table, has_nulls, compare_nulls, stream); + auto const probe_join_type = + (join == cudf::detail::join_kind::FULL_JOIN) ? cudf::detail::join_kind::LEFT_JOIN : join; + + std::size_t const join_size = output_size ? *output_size + : compute_join_output_size(build_table, + probe_table, + preprocessed_build, + preprocessed_probe, + hash_table, + probe_join_type, + has_nulls, + compare_nulls, + stream); // If output size is zero, return immediately if (join_size == 0) { @@ -154,35 +184,57 @@ probe_join_hash_table(cudf::table_device_view build_table, auto right_indices = std::make_unique>(join_size, stream, mr); auto const probe_nulls = cudf::nullate::DYNAMIC{has_nulls}; - pair_equality equality{probe_table, build_table, probe_nulls, compare_nulls}; - row_hash hash_probe{probe_nulls, probe_table}; + auto const row_hash = cudf::experimental::row::hash::row_hasher{preprocessed_probe}; + auto const hash_probe = row_hash.device_hasher(probe_nulls); auto const empty_key_sentinel = hash_table.get_empty_key_sentinel(); - make_pair_function pair_func{hash_probe, empty_key_sentinel}; - - auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func); + auto const iter = cudf::detail::make_counting_transform_iterator( + 0, make_pair_function{hash_probe, empty_key_sentinel}); - const cudf::size_type probe_table_num_rows = probe_table.num_rows(); + cudf::size_type const probe_table_num_rows = probe_table.num_rows(); - auto out1_zip_begin = thrust::make_zip_iterator( + auto const out1_zip_begin = thrust::make_zip_iterator( thrust::make_tuple(thrust::make_discard_iterator(), left_indices->begin())); - auto out2_zip_begin = thrust::make_zip_iterator( + auto const out2_zip_begin = thrust::make_zip_iterator( thrust::make_tuple(thrust::make_discard_iterator(), right_indices->begin())); - if constexpr (JoinKind == cudf::detail::join_kind::FULL_JOIN or - JoinKind == cudf::detail::join_kind::LEFT_JOIN) { - [[maybe_unused]] auto [out1_zip_end, out2_zip_end] = hash_table.pair_retrieve_outer( - iter, iter + probe_table_num_rows, out1_zip_begin, out2_zip_begin, equality, stream.value()); - - if constexpr (JoinKind == cudf::detail::join_kind::FULL_JOIN) { - auto const actual_size = out1_zip_end - out1_zip_begin; - left_indices->resize(actual_size, stream); - right_indices->resize(actual_size, stream); + auto const row_comparator = + cudf::experimental::row::equality::two_table_comparator{preprocessed_probe, preprocessed_build}; + auto const comparator_helper = [&](auto device_comparator) { + pair_equality equality{device_comparator}; + + if (join == cudf::detail::join_kind::FULL_JOIN or join == cudf::detail::join_kind::LEFT_JOIN) { + [[maybe_unused]] auto [out1_zip_end, out2_zip_end] = + hash_table.pair_retrieve_outer(iter, + iter + probe_table_num_rows, + out1_zip_begin, + out2_zip_begin, + equality, + stream.value()); + + if (join == cudf::detail::join_kind::FULL_JOIN) { + auto const actual_size = thrust::distance(out1_zip_begin, out1_zip_end); + left_indices->resize(actual_size, stream); + right_indices->resize(actual_size, stream); + } + } else { + hash_table.pair_retrieve(iter, + iter + probe_table_num_rows, + out1_zip_begin, + out2_zip_begin, + equality, + stream.value()); } + }; + + if (cudf::detail::has_nested_columns(probe_table)) { + auto const device_comparator = row_comparator.equal_to(probe_nulls, compare_nulls); + comparator_helper(device_comparator); } else { - hash_table.pair_retrieve( - iter, iter + probe_table_num_rows, out1_zip_begin, out2_zip_begin, equality, stream.value()); + auto const device_comparator = row_comparator.equal_to(probe_nulls, compare_nulls); + comparator_helper(device_comparator); } + return std::pair(std::move(left_indices), std::move(right_indices)); } @@ -192,25 +244,40 @@ probe_join_hash_table(cudf::table_device_view build_table, * TODO: this is a temporary solution as part of `full_join_size`. To be refactored during * cuco integration. * - * @param build_table Table of build side columns to join. - * @param probe_table Table of probe side columns to join. - * @param hash_table Hash table built from `build_table`. - * @param compare_nulls Controls whether null join-key values should match or not. - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the intermediate vectors. + * @param build_table Table of build side columns to join + * @param probe_table Table of probe side columns to join + * @param preprocessed_build shared_ptr to cudf::experimental::row::equality::preprocessed_table for + * build_table + * @param preprocessed_probe shared_ptr to cudf::experimental::row::equality::preprocessed_table for + * probe_table + * @param hash_table Hash table built from `build_table` + * @param has_nulls Flag to denote if build or probe tables have nested nulls + * @param compare_nulls Controls whether null join-key values should match or not + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the intermediate vectors * * @return Output size of full join. */ -std::size_t get_full_join_size(cudf::table_device_view build_table, - cudf::table_device_view probe_table, - cudf::detail::multimap_type const& hash_table, - bool const has_nulls, - null_equality const compare_nulls, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +std::size_t get_full_join_size( + cudf::table_view const& build_table, + cudf::table_view const& probe_table, + std::shared_ptr const& preprocessed_build, + std::shared_ptr const& preprocessed_probe, + cudf::detail::multimap_type const& hash_table, + bool has_nulls, + null_equality compare_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - std::size_t join_size = compute_join_output_size( - build_table, probe_table, hash_table, has_nulls, compare_nulls, stream); + std::size_t join_size = compute_join_output_size(build_table, + probe_table, + preprocessed_build, + preprocessed_probe, + hash_table, + cudf::detail::join_kind::LEFT_JOIN, + has_nulls, + compare_nulls, + stream); // If output size is zero, return immediately if (join_size == 0) { return join_size; } @@ -219,23 +286,34 @@ std::size_t get_full_join_size(cudf::table_device_view build_table, auto right_indices = std::make_unique>(join_size, stream, mr); auto const probe_nulls = cudf::nullate::DYNAMIC{has_nulls}; - pair_equality equality{probe_table, build_table, probe_nulls, compare_nulls}; - row_hash hash_probe{probe_nulls, probe_table}; + auto const row_hash = cudf::experimental::row::hash::row_hasher{preprocessed_probe}; + auto const hash_probe = row_hash.device_hasher(probe_nulls); auto const empty_key_sentinel = hash_table.get_empty_key_sentinel(); - make_pair_function pair_func{hash_probe, empty_key_sentinel}; - - auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func); + auto const iter = cudf::detail::make_counting_transform_iterator( + 0, make_pair_function{hash_probe, empty_key_sentinel}); - const cudf::size_type probe_table_num_rows = probe_table.num_rows(); + cudf::size_type const probe_table_num_rows = probe_table.num_rows(); - auto out1_zip_begin = thrust::make_zip_iterator( + auto const out1_zip_begin = thrust::make_zip_iterator( thrust::make_tuple(thrust::make_discard_iterator(), left_indices->begin())); - auto out2_zip_begin = thrust::make_zip_iterator( + auto const out2_zip_begin = thrust::make_zip_iterator( thrust::make_tuple(thrust::make_discard_iterator(), right_indices->begin())); - hash_table.pair_retrieve_outer( - iter, iter + probe_table_num_rows, out1_zip_begin, out2_zip_begin, equality, stream.value()); + auto const row_comparator = + cudf::experimental::row::equality::two_table_comparator{preprocessed_probe, preprocessed_build}; + auto const comparator_helper = [&](auto device_comparator) { + pair_equality equality{device_comparator}; + hash_table.pair_retrieve_outer( + iter, iter + probe_table_num_rows, out1_zip_begin, out2_zip_begin, equality, stream.value()); + }; + if (cudf::detail::has_nested_columns(probe_table)) { + auto const device_comparator = row_comparator.equal_to(probe_nulls, compare_nulls); + comparator_helper(device_comparator); + } else { + auto const device_comparator = row_comparator.equal_to(probe_nulls, compare_nulls); + comparator_helper(device_comparator); + } // Release intermediate memory allocation left_indices->resize(0, stream); @@ -291,26 +369,20 @@ hash_join::hash_join(cudf::table_view const& build, cuco::empty_key{std::numeric_limits::max()}, cuco::empty_value{cudf::detail::JoinNoneValue}, stream.value(), - detail::hash_table_allocator_type{default_allocator{}, stream}} + detail::hash_table_allocator_type{default_allocator{}, stream}}, + _build{build}, + _preprocessed_build{ + cudf::experimental::row::equality::preprocessed_table::create(_build, stream)} { CUDF_FUNC_RANGE(); CUDF_EXPECTS(0 != build.num_columns(), "Hash join build table is empty"); CUDF_EXPECTS(build.num_rows() < cudf::detail::MAX_JOIN_SIZE, "Build column size is too big for hash join"); - // need to store off the owning structures for some of the views in _build - _flattened_build_table = - structs::detail::flatten_nested_columns(build, - {}, - {}, - structs::detail::column_nullability::FORCE, - stream, - rmm::mr::get_current_device_resource()); - _build = _flattened_build_table->flattened_columns(); - if (_is_empty) { return; } cudf::detail::build_join_hash_table(_build, + _preprocessed_build, _hash_table, _nulls_equal, static_cast(_composite_bitmask.data()), @@ -326,7 +398,7 @@ hash_join::inner_join(cudf::table_view const& probe, rmm::mr::device_memory_resource* mr) const { CUDF_FUNC_RANGE(); - return compute_hash_join(probe, output_size, stream, mr); + return compute_hash_join(probe, cudf::detail::join_kind::INNER_JOIN, output_size, stream, mr); } template @@ -338,7 +410,7 @@ hash_join::left_join(cudf::table_view const& probe, rmm::mr::device_memory_resource* mr) const { CUDF_FUNC_RANGE(); - return compute_hash_join(probe, output_size, stream, mr); + return compute_hash_join(probe, cudf::detail::join_kind::LEFT_JOIN, output_size, stream, mr); } template @@ -350,7 +422,7 @@ hash_join::full_join(cudf::table_view const& probe, rmm::mr::device_memory_resource* mr) const { CUDF_FUNC_RANGE(); - return compute_hash_join(probe, output_size, stream, mr); + return compute_hash_join(probe, cudf::detail::join_kind::FULL_JOIN, output_size, stream, mr); } template @@ -362,23 +434,17 @@ std::size_t hash_join::inner_join_size(cudf::table_view const& probe, // Return directly if build table is empty if (_is_empty) { return 0; } - auto flattened_probe = - structs::detail::flatten_nested_columns(probe, - {}, - {}, - structs::detail::column_nullability::FORCE, - stream, - rmm::mr::get_current_device_resource()); - auto const flattened_probe_table = flattened_probe->flattened_columns(); - - auto build_table_ptr = cudf::table_device_view::create(_build, stream); - auto flattened_probe_table_ptr = cudf::table_device_view::create(flattened_probe_table, stream); - - return cudf::detail::compute_join_output_size( - *build_table_ptr, - *flattened_probe_table_ptr, + auto const preprocessed_probe = + cudf::experimental::row::equality::preprocessed_table::create(probe, stream); + + return cudf::detail::compute_join_output_size( + _build, + probe, + _preprocessed_build, + preprocessed_probe, _hash_table, - cudf::has_nulls(flattened_probe_table) | cudf::has_nulls(_build), + cudf::detail::join_kind::INNER_JOIN, + cudf::has_nested_nulls(probe) || cudf::has_nested_nulls(_build), _nulls_equal, stream); } @@ -392,23 +458,17 @@ std::size_t hash_join::left_join_size(cudf::table_view const& probe, // Trivial left join case - exit early if (_is_empty) { return probe.num_rows(); } - auto flattened_probe = - structs::detail::flatten_nested_columns(probe, - {}, - {}, - structs::detail::column_nullability::FORCE, - stream, - rmm::mr::get_current_device_resource()); - auto const flattened_probe_table = flattened_probe->flattened_columns(); - - auto build_table_ptr = cudf::table_device_view::create(_build, stream); - auto flattened_probe_table_ptr = cudf::table_device_view::create(flattened_probe_table, stream); - - return cudf::detail::compute_join_output_size( - *build_table_ptr, - *flattened_probe_table_ptr, + auto const preprocessed_probe = + cudf::experimental::row::equality::preprocessed_table::create(probe, stream); + + return cudf::detail::compute_join_output_size( + _build, + probe, + _preprocessed_build, + preprocessed_probe, _hash_table, - cudf::has_nulls(flattened_probe_table) | cudf::has_nulls(_build), + cudf::detail::join_kind::LEFT_JOIN, + cudf::has_nested_nulls(probe) || cudf::has_nested_nulls(_build), _nulls_equal, stream); } @@ -423,58 +483,53 @@ std::size_t hash_join::full_join_size(cudf::table_view const& probe, // Trivial left join case - exit early if (_is_empty) { return probe.num_rows(); } - auto flattened_probe = - structs::detail::flatten_nested_columns(probe, - {}, - {}, - structs::detail::column_nullability::FORCE, - stream, - rmm::mr::get_current_device_resource()); - auto const flattened_probe_table = flattened_probe->flattened_columns(); - - auto build_table_ptr = cudf::table_device_view::create(_build, stream); - auto flattened_probe_table_ptr = cudf::table_device_view::create(flattened_probe_table, stream); + auto const preprocessed_probe = + cudf::experimental::row::equality::preprocessed_table::create(probe, stream); return cudf::detail::get_full_join_size( - *build_table_ptr, - *flattened_probe_table_ptr, + _build, + probe, + _preprocessed_build, + preprocessed_probe, _hash_table, - cudf::has_nulls(flattened_probe_table) | cudf::has_nulls(_build), + cudf::has_nested_nulls(probe) || cudf::has_nested_nulls(_build), _nulls_equal, stream, mr); } template -template std::pair>, std::unique_ptr>> hash_join::probe_join_indices(cudf::table_view const& probe_table, + cudf::detail::join_kind join, std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const { // Trivial left join case - exit early - if (_is_empty and JoinKind != cudf::detail::join_kind::INNER_JOIN) { + if (_is_empty and join != cudf::detail::join_kind::INNER_JOIN) { return get_trivial_left_join_indices(probe_table, stream, mr); } CUDF_EXPECTS(!_is_empty, "Hash table of hash join is null."); - auto build_table_ptr = cudf::table_device_view::create(_build, stream); - auto probe_table_ptr = cudf::table_device_view::create(probe_table, stream); - - auto join_indices = cudf::detail::probe_join_hash_table( - *build_table_ptr, - *probe_table_ptr, + auto const preprocessed_probe = + cudf::experimental::row::equality::preprocessed_table::create(probe_table, stream); + auto join_indices = cudf::detail::probe_join_hash_table( + _build, + probe_table, + _preprocessed_build, + preprocessed_probe, _hash_table, - cudf::has_nulls(probe_table) | cudf::has_nulls(_build), + join, + cudf::has_nested_nulls(probe_table) || cudf::has_nested_nulls(_build), _nulls_equal, output_size, stream, mr); - if constexpr (JoinKind == cudf::detail::join_kind::FULL_JOIN) { + if (join == cudf::detail::join_kind::FULL_JOIN) { auto complement_indices = detail::get_left_join_indices_complement( join_indices.second, probe_table.num_rows(), _build.num_rows(), stream, mr); join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream); @@ -483,10 +538,10 @@ hash_join::probe_join_indices(cudf::table_view const& probe_table, } template -template std::pair>, std::unique_ptr>> hash_join::compute_hash_join(cudf::table_view const& probe, + cudf::detail::join_kind join, std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const @@ -495,31 +550,22 @@ hash_join::compute_hash_join(cudf::table_view const& probe, CUDF_EXPECTS(probe.num_rows() < cudf::detail::MAX_JOIN_SIZE, "Probe column size is too big for hash join"); - auto flattened_probe = - structs::detail::flatten_nested_columns(probe, - {}, - {}, - structs::detail::column_nullability::FORCE, - stream, - rmm::mr::get_current_device_resource()); - auto const flattened_probe_table = flattened_probe->flattened_columns(); - - CUDF_EXPECTS(_build.num_columns() == flattened_probe_table.num_columns(), + CUDF_EXPECTS(_build.num_columns() == probe.num_columns(), "Mismatch in number of columns to be joined on"); - if (is_trivial_join(flattened_probe_table, _build, JoinKind)) { + if (is_trivial_join(probe, _build, join)) { return std::pair(std::make_unique>(0, stream, mr), std::make_unique>(0, stream, mr)); } CUDF_EXPECTS(std::equal(std::cbegin(_build), std::cend(_build), - std::cbegin(flattened_probe_table), - std::cend(flattened_probe_table), + std::cbegin(probe), + std::cend(probe), [](const auto& b, const auto& p) { return b.type() == p.type(); }), "Mismatch in joining column data types"); - return probe_join_indices(flattened_probe_table, output_size, stream, mr); + return probe_join_indices(probe, join, output_size, stream, mr); } } // namespace detail diff --git a/cpp/src/join/join_common_utils.cuh b/cpp/src/join/join_common_utils.cuh index bc4c62291b2..5daf9988768 100644 --- a/cpp/src/join/join_common_utils.cuh +++ b/cpp/src/join/join_common_utils.cuh @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -101,27 +102,31 @@ class row_is_valid { * * @tparam Comparator The row comparator type to perform row equality comparison from row indices. */ -template +template class pair_equality { public: - pair_equality(table_device_view lhs, - table_device_view rhs, - nullate::DYNAMIC has_nulls, - null_equality nulls_are_equal = null_equality::EQUAL) - : _check_row_equality{has_nulls, lhs, rhs, nulls_are_equal} + pair_equality(DeviceComparator check_row_equality) + : _check_row_equality{std::move(check_row_equality)} { } - pair_equality(Comparator const d_eqcomp) : _check_row_equality{std::move(d_eqcomp)} {} - + // The parameters are build/probe rather than left/right because the operator + // is called by cuco's kernels with parameters in this order (note that this + // is an implementation detail that we should eventually stop relying on by + // defining operators with suitable heterogeneous typing). Rather than + // converting to left/right semantics, we can operate directly on build/probe template __device__ __forceinline__ bool operator()(LhsPair const& lhs, RhsPair const& rhs) const noexcept { - return lhs.first == rhs.first and _check_row_equality(rhs.second, lhs.second); + using experimental::row::lhs_index_type; + using experimental::row::rhs_index_type; + + return lhs.first == rhs.first and + _check_row_equality(lhs_index_type{rhs.second}, rhs_index_type{lhs.second}); } private: - Comparator _check_row_equality; + DeviceComparator _check_row_equality; }; /** @@ -150,6 +155,8 @@ get_trivial_left_join_indices(table_view const& left, * @tparam MultimapType The type of the hash table * * @param build Table of columns used to build join hash. + * @param preprocessed_build shared_ptr to cudf::experimental::row::equality::preprocessed_table for + * build * @param hash_table Build hash table. * @param nulls_equal Flag to denote nulls are equal or not. * @param bitmask Bitmask to denote whether a row is valid. @@ -157,24 +164,26 @@ get_trivial_left_join_indices(table_view const& left, * */ template -void build_join_hash_table(cudf::table_view const& build, - MultimapType& hash_table, - null_equality const nulls_equal, - [[maybe_unused]] bitmask_type const* bitmask, - rmm::cuda_stream_view stream) +void build_join_hash_table( + cudf::table_view const& build, + std::shared_ptr const& preprocessed_build, + MultimapType& hash_table, + null_equality nulls_equal, + [[maybe_unused]] bitmask_type const* bitmask, + rmm::cuda_stream_view stream) { - auto build_table_ptr = cudf::table_device_view::create(build, stream); + CUDF_EXPECTS(0 != build.num_columns(), "Selected build dataset is empty"); + CUDF_EXPECTS(0 != build.num_rows(), "Build side table has no rows"); - CUDF_EXPECTS(0 != build_table_ptr->num_columns(), "Selected build dataset is empty"); - CUDF_EXPECTS(0 != build_table_ptr->num_rows(), "Build side table has no rows"); + auto const row_hash = experimental::row::hash::row_hasher{preprocessed_build}; + auto const hash_build = row_hash.device_hasher(nullate::DYNAMIC{cudf::has_nested_nulls(build)}); - row_hash hash_build{nullate::DYNAMIC{cudf::has_nulls(build)}, *build_table_ptr}; auto const empty_key_sentinel = hash_table.get_empty_key_sentinel(); make_pair_function pair_func{hash_build, empty_key_sentinel}; - auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func); + auto const iter = cudf::detail::make_counting_transform_iterator(0, pair_func); - size_type const build_table_num_rows{build_table_ptr->num_rows()}; + size_type const build_table_num_rows{build.num_rows()}; if (nulls_equal == cudf::null_equality::EQUAL or (not nullable(build))) { hash_table.insert(iter, iter + build_table_num_rows, stream.value()); } else { diff --git a/cpp/src/join/mixed_join.cu b/cpp/src/join/mixed_join.cu index 8d66cba8f8d..26200e41d5f 100644 --- a/cpp/src/join/mixed_join.cu +++ b/cpp/src/join/mixed_join.cu @@ -141,8 +141,14 @@ mixed_join( // won't be able to support AST conditions for those types anyway. auto const row_bitmask = cudf::detail::bitmask_and(build, stream, rmm::mr::get_current_device_resource()).first; - build_join_hash_table( - build, hash_table, compare_nulls, static_cast(row_bitmask.data()), stream); + auto const preprocessed_build = + experimental::row::equality::preprocessed_table::create(build, stream); + build_join_hash_table(build, + preprocessed_build, + hash_table, + compare_nulls, + static_cast(row_bitmask.data()), + stream); auto hash_table_view = hash_table.get_device_view(); auto left_conditional_view = table_device_view::create(left_conditional, stream); @@ -390,8 +396,14 @@ compute_mixed_join_output_size(table_view const& left_equality, // won't be able to support AST conditions for those types anyway. auto const row_bitmask = cudf::detail::bitmask_and(build, stream, rmm::mr::get_current_device_resource()).first; - build_join_hash_table( - build, hash_table, compare_nulls, static_cast(row_bitmask.data()), stream); + auto const preprocessed_build = + experimental::row::equality::preprocessed_table::create(build, stream); + build_join_hash_table(build, + preprocessed_build, + hash_table, + compare_nulls, + static_cast(row_bitmask.data()), + stream); auto hash_table_view = hash_table.get_device_view(); auto left_conditional_view = table_device_view::create(left_conditional, stream); diff --git a/cpp/tests/join/join_tests.cpp b/cpp/tests/join/join_tests.cpp index 404ff7d8380..4062c3bd921 100644 --- a/cpp/tests/join/join_tests.cpp +++ b/cpp/tests/join/join_tests.cpp @@ -1882,4 +1882,151 @@ TEST_F(JoinTest, Repro_StructsWithoutNullsPushedDown) CUDF_TEST_EXPECT_TABLES_EQUIVALENT(superimposed_results, expected); } +using lcw = cudf::test::lists_column_wrapper; +using cudf::test::iterators::null_at; + +struct JoinTestLists : public cudf::test::BaseFixture { + /* + [ + NULL, 0 + [1], 1 + [2, NULL], 2 + [], 3 + [5, 6] 4 + */ + lcw build{{{0}, {1}, {{2, 0}, null_at(1)}, {}, {5, 6}}, null_at(0)}; + + /* + [ + [1], 0 + [3], 1 + NULL, 2 + [], 3 + [2, NULL], 4 + [5], 5 + [6] 6 + ] + */ + lcw probe{{{1}, {3}, {0}, {}, {{2, 0}, null_at(1)}, {5}, {6}}, null_at(2)}; + + auto column_view_from_device_uvector(rmm::device_uvector const& vector) + { + auto const indices_span = cudf::device_span{vector}; + return cudf::column_view{indices_span}; + } + + auto sort_and_gather( + cudf::table_view table, + cudf::column_view gather_map, + cudf::out_of_bounds_policy oob_policy = cudf::out_of_bounds_policy::DONT_CHECK) + { + auto const gather_table = cudf::gather(table, gather_map, oob_policy); + auto const sort_order = cudf::sorted_order(*gather_table); + return cudf::gather(*gather_table, *sort_order); + } + + template + void join(cudf::column_view left_gold_map, + cudf::column_view right_gold_map, + cudf::null_equality nulls_equal, + JoinFunc join_func, + cudf::out_of_bounds_policy oob_policy) + { + auto const build_tv = cudf::table_view{{build}}; + auto const probe_tv = cudf::table_view{{probe}}; + + auto const [left_result_map, right_result_map] = + join_func(build_tv, probe_tv, nulls_equal, rmm::mr::get_current_device_resource()); + + auto const left_result_table = + sort_and_gather(build_tv, column_view_from_device_uvector(*left_result_map), oob_policy); + auto const right_result_table = + sort_and_gather(probe_tv, column_view_from_device_uvector(*right_result_map), oob_policy); + + auto const left_gold_table = sort_and_gather(build_tv, left_gold_map, oob_policy); + auto const right_gold_table = sort_and_gather(probe_tv, right_gold_map, oob_policy); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*left_result_table, *left_gold_table); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*right_result_table, *right_gold_table); + } + + void inner_join(cudf::column_view left_gold_map, + cudf::column_view right_gold_map, + cudf::null_equality nulls_equal) + { + join(left_gold_map, + right_gold_map, + nulls_equal, + cudf::inner_join, + cudf::out_of_bounds_policy::DONT_CHECK); + } + + void full_join(cudf::column_view left_gold_map, + cudf::column_view right_gold_map, + cudf::null_equality nulls_equal) + { + join(left_gold_map, + right_gold_map, + nulls_equal, + cudf::full_join, + cudf::out_of_bounds_policy::NULLIFY); + } + + void left_join(cudf::column_view left_gold_map, + cudf::column_view right_gold_map, + cudf::null_equality nulls_equal) + { + join(left_gold_map, + right_gold_map, + nulls_equal, + cudf::left_join, + cudf::out_of_bounds_policy::NULLIFY); + } +}; + +TEST_F(JoinTestLists, ListWithNullsEqualInnerJoin) +{ + auto const left_gold_map = column_wrapper({0, 1, 2, 3}); + auto const right_gold_map = column_wrapper({0, 2, 3, 4}); + this->inner_join(left_gold_map, right_gold_map, cudf::null_equality::EQUAL); +} + +TEST_F(JoinTestLists, ListWithNullsUnequalInnerJoin) +{ + auto const left_gold_map = column_wrapper({1, 3}); + auto const right_gold_map = column_wrapper({0, 3}); + this->inner_join(left_gold_map, right_gold_map, cudf::null_equality::UNEQUAL); +} + +TEST_F(JoinTestLists, ListWithNullsEqualFullJoin) +{ + auto const left_gold_map = + column_wrapper({0, 1, 2, 3, 4, NoneValue, NoneValue, NoneValue}); + auto const right_gold_map = column_wrapper({2, 0, 4, 3, NoneValue, 1, 5, 6}); + this->full_join(left_gold_map, right_gold_map, cudf::null_equality::EQUAL); +} + +TEST_F(JoinTestLists, ListWithNullsUnequalFullJoin) +{ + auto const left_gold_map = + column_wrapper({0, 1, 2, 3, 4, NoneValue, NoneValue, NoneValue, NoneValue, NoneValue}); + auto const right_gold_map = + column_wrapper({NoneValue, 0, NoneValue, 3, NoneValue, 1, 5, 6, 2, 4}); + this->full_join(left_gold_map, right_gold_map, cudf::null_equality::UNEQUAL); +} + +TEST_F(JoinTestLists, ListWithNullsEqualLeftJoin) +{ + auto const left_gold_map = column_wrapper({0, 1, 2, 3, 4}); + auto const right_gold_map = column_wrapper({2, 0, 4, 3, NoneValue}); + this->left_join(left_gold_map, right_gold_map, cudf::null_equality::EQUAL); +} + +TEST_F(JoinTestLists, ListWithNullsUnequalLeftJoin) +{ + auto const left_gold_map = column_wrapper({0, 1, 2, 3, 4}); + auto const right_gold_map = column_wrapper({NoneValue, 0, NoneValue, 3, NoneValue}); + this->left_join(left_gold_map, right_gold_map, cudf::null_equality::UNEQUAL); +} + CUDF_TEST_PROGRAM_MAIN()