diff --git a/cpp/benchmarks/join/generate_input_tables.cuh b/cpp/benchmarks/join/generate_input_tables.cuh index d7f64716e58..e846317f472 100644 --- a/cpp/benchmarks/join/generate_input_tables.cuh +++ b/cpp/benchmarks/join/generate_input_tables.cuh @@ -41,17 +41,12 @@ __global__ static void init_curand(curandState* state, const int nstates) template __global__ static void init_build_tbl(key_type* const build_tbl, const size_type build_tbl_size, - const key_type rand_max, - const bool uniq_build_tbl_keys, - key_type* const lottery, - const size_type lottery_size, + const int multiplicity, curandState* state, const int num_states) { - static_assert(std::is_signed::value, "key_type needs to be signed for lottery to work"); - - const int start_idx = blockIdx.x * blockDim.x + threadIdx.x; - const key_type stride = blockDim.x * gridDim.x; + auto const start_idx = blockIdx.x * blockDim.x + threadIdx.x; + auto const stride = blockDim.x * gridDim.x; assert(start_idx < num_states); curandState localState = state[start_idx]; @@ -59,28 +54,7 @@ __global__ static void init_build_tbl(key_type* const build_tbl, for (size_type idx = start_idx; idx < build_tbl_size; idx += stride) { const double x = curand_uniform_double(&localState); - if (uniq_build_tbl_keys) { - // If the build table keys need to be unique, go through lottery array from lottery_idx until - // finding a key which has not been used (-1). Mark the key as been used by atomically setting - // the spot to -1. - - size_type lottery_idx = x * lottery_size; - key_type lottery_val = -1; - - while (-1 == lottery_val) { - lottery_val = lottery[lottery_idx]; - - if (-1 != lottery_val) { - lottery_val = atomicCAS(lottery + lottery_idx, lottery_val, -1); - } - - lottery_idx = (lottery_idx + 1) % lottery_size; - } - - build_tbl[idx] = lottery_val; - } else { - build_tbl[idx] = x * rand_max; - } + build_tbl[idx] = static_cast(x * (build_tbl_size / multiplicity)); } state[start_idx] = localState; @@ -89,16 +63,15 @@ __global__ static void init_build_tbl(key_type* const build_tbl, template __global__ void init_probe_tbl(key_type* const probe_tbl, const size_type probe_tbl_size, - const key_type* const build_tbl, const size_type build_tbl_size, - const key_type* const lottery, - const size_type lottery_size, + const key_type rand_max, const double selectivity, + const int multiplicity, curandState* state, const int num_states) { - const int start_idx = blockIdx.x * blockDim.x + threadIdx.x; - const size_type stride = blockDim.x * gridDim.x; + auto const start_idx = blockIdx.x * blockDim.x + threadIdx.x; + auto const stride = blockDim.x * gridDim.x; assert(start_idx < num_states); curandState localState = state[start_idx]; @@ -109,21 +82,15 @@ __global__ void init_probe_tbl(key_type* const probe_tbl, if (x <= selectivity) { // x <= selectivity means this key in the probe table should be present in the build table, so - // we pick a key from build_tbl - x = curand_uniform_double(&localState); - size_type build_tbl_idx = x * build_tbl_size; - - if (build_tbl_idx >= build_tbl_size) { build_tbl_idx = build_tbl_size - 1; } - - val = build_tbl[build_tbl_idx]; + // we pick a key from [0, build_tbl_size / multiplicity] + x = curand_uniform_double(&localState); + val = static_cast(x * (build_tbl_size / multiplicity)); } else { // This key in the probe table should not be present in the build table, so we pick a key from - // lottery. - x = curand_uniform_double(&localState); - size_type lottery_idx = x * lottery_size; - val = lottery[lottery_idx]; + // [build_tbl_size, rand_max]. + x = curand_uniform_double(&localState); + val = static_cast(x * (rand_max - build_tbl_size) + build_tbl_size); } - probe_tbl[idx] = val; } @@ -152,9 +119,7 @@ __global__ void init_probe_tbl(key_type* const probe_tbl, * @param[in] build_tbl_size number of keys in the build table * @param[in] selectivity probability with which an element of the probe table is * present in the build table. - * @param[in] rand_max maximum random number to generate. I.e. random numbers are - * integers from [0,rand_max]. - * @param[in] uniq_build_tbl_keys if each key in the build table should appear exactly once. + * @param[in] multiplicity number of matches for each key. */ template void generate_input_tables(key_type* const build_tbl, @@ -162,8 +127,7 @@ void generate_input_tables(key_type* const build_tbl, key_type* const probe_tbl, const size_type probe_tbl_size, const double selectivity, - const key_type rand_max, - const bool uniq_build_tbl_keys) + const int multiplicity) { // With large values of rand_max the a lot of temporary storage is needed for the lottery. At the // expense of not being that accurate with applying the selectivity an especially more memory @@ -171,9 +135,7 @@ void generate_input_tables(key_type* const build_tbl, // let one table choose random numbers from only one interval and the other only select with // selective probability from the same interval and from the other in the other cases. - static_assert(std::is_signed::value, "key_type needs to be signed for lottery to work"); - - const int block_size = 128; + constexpr int block_size = 128; // Maximize exposed parallelism while minimizing storage for curand state int num_blocks_init_build_tbl{-1}; @@ -198,55 +160,20 @@ void generate_input_tables(key_type* const build_tbl, CHECK_CUDA(0); - size_type lottery_size = - rand_max < std::numeric_limits::max() - 1 ? rand_max + 1 : rand_max; - rmm::device_uvector lottery(lottery_size, rmm::cuda_stream_default); - - if (uniq_build_tbl_keys) { - thrust::sequence(rmm::exec_policy(), lottery.begin(), lottery.end(), 0); - } - - init_build_tbl - <<>>(build_tbl, - build_tbl_size, - rand_max, - uniq_build_tbl_keys, - lottery.data(), - lottery_size, - devStates.data(), - num_states); + init_build_tbl<<>>( + build_tbl, build_tbl_size, multiplicity, devStates.data(), num_states); CHECK_CUDA(0); - rmm::device_uvector build_tbl_sorted(build_tbl_size, rmm::cuda_stream_default); - - CUDA_TRY(cudaMemcpy(build_tbl_sorted.data(), - build_tbl, - build_tbl_size * sizeof(key_type), - cudaMemcpyDeviceToDevice)); - - thrust::sort(rmm::exec_policy(), build_tbl_sorted.begin(), build_tbl_sorted.end()); - - // Exclude keys used in build table from lottery - thrust::counting_iterator first_lottery_elem(0); - thrust::counting_iterator last_lottery_elem = first_lottery_elem + lottery_size; - key_type* lottery_end = thrust::set_difference(rmm::exec_policy(), - first_lottery_elem, - last_lottery_elem, - build_tbl_sorted.begin(), - build_tbl_sorted.end(), - lottery.data()); - - lottery_size = thrust::distance(lottery.data(), lottery_end); + auto const rand_max = std::numeric_limits::max(); init_probe_tbl <<>>(probe_tbl, probe_tbl_size, - build_tbl, build_tbl_size, - lottery.data(), - lottery_size, + rand_max, selectivity, + multiplicity, devStates.data(), num_states); diff --git a/cpp/benchmarks/join/join_benchmark_common.hpp b/cpp/benchmarks/join/join_benchmark_common.hpp index add87bf7dfb..e88253395d8 100644 --- a/cpp/benchmarks/join/join_benchmark_common.hpp +++ b/cpp/benchmarks/join/join_benchmark_common.hpp @@ -60,14 +60,13 @@ static void BM_join(state_type& state, Join JoinFunc) } }(); - const cudf::size_type rand_max_val{build_table_size * 2}; - const double selectivity = 0.3; - const bool is_build_table_key_unique = true; + const double selectivity = 0.3; + const int multiplicity = 1; // Generate build and probe tables cudf::test::UniformRandomGenerator rand_gen(0, build_table_size); auto build_random_null_mask = [&rand_gen](int size) { - // roughly 25% nulls + // roughly 75% nulls auto validity = thrust::make_transform_iterator( thrust::make_counting_iterator(0), [&rand_gen](auto i) { return (rand_gen.generate() & 3) == 0; }); @@ -95,8 +94,7 @@ static void BM_join(state_type& state, Join JoinFunc) probe_key_column->mutable_view().data(), probe_table_size, selectivity, - rand_max_val, - is_build_table_key_unique); + multiplicity); auto payload_data_it = thrust::make_counting_iterator(0); cudf::test::fixed_width_column_wrapper build_payload_column( @@ -125,12 +123,12 @@ static void BM_join(state_type& state, Join JoinFunc) if constexpr (std::is_same_v and (not is_conditional)) { state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { rmm::cuda_stream_view stream_view{launch.get_stream()}; - JoinFunc(probe_table, - build_table, - columns_to_join, - columns_to_join, - cudf::null_equality::UNEQUAL, - stream_view); + auto result = JoinFunc(probe_table, + build_table, + columns_to_join, + columns_to_join, + cudf::null_equality::UNEQUAL, + stream_view); }); } diff --git a/cpp/cmake/thirdparty/get_cucollections.cmake b/cpp/cmake/thirdparty/get_cucollections.cmake index 47dbc037334..6764c78ed87 100644 --- a/cpp/cmake/thirdparty/get_cucollections.cmake +++ b/cpp/cmake/thirdparty/get_cucollections.cmake @@ -14,6 +14,8 @@ # limitations under the License. #============================================================================= +# cuCollections doesn't have a version + function(find_and_configure_cucollections) # Find or install cuCollections @@ -21,7 +23,7 @@ function(find_and_configure_cucollections) GLOBAL_TARGETS cuco::cuco CPM_ARGS GITHUB_REPOSITORY NVIDIA/cuCollections - GIT_TAG 729857a5698a0e8d8f812e0464f65f37854ae17b + GIT_TAG 62b90b7f7adf272455007b1c857e1d621aaf13ca OPTIONS "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" "BUILD_EXAMPLES OFF" diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index c1a5f8f17c3..99a94c45510 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include #include @@ -33,6 +34,26 @@ namespace cudf { namespace detail { +namespace { + +/** + * @brief Device functor to determine if a row is valid. + */ +class row_is_valid { + public: + row_is_valid(bitmask_type const* row_bitmask) : _row_bitmask{row_bitmask} {} + + __device__ __inline__ bool operator()(const size_type& i) const noexcept + { + return cudf::bit_is_set(_row_bitmask, i); + } + + private: + bitmask_type const* _row_bitmask; +}; + +} // anonymous namespace + std::pair, std::unique_ptr> get_empty_joined_table( table_view const& probe, table_view const& build) { @@ -44,51 +65,39 @@ std::pair, std::unique_ptr
> get_empty_joined_table /** * @brief Builds the hash table based on the given `build_table`. * - * @throw cudf::logic_error if the number of columns in `build` table is 0. - * @throw cudf::logic_error if the number of rows in `build` table is 0. - * @throw cudf::logic_error if insertion to the hash table fails. - * * @param build Table of columns used to build join hash. + * @param hash_table Build hash table. * @param compare_nulls Controls whether null join-key values should match or not. * @param stream CUDA stream used for device memory operations and kernel launches. * - * @return Built hash table. */ -std::unique_ptr> build_join_hash_table( - cudf::table_view const& build, null_equality compare_nulls, rmm::cuda_stream_view stream) +void build_join_hash_table(cudf::table_view const& build, + multimap_type& hash_table, + null_equality compare_nulls, + rmm::cuda_stream_view stream) { - auto build_device_table = cudf::table_device_view::create(build, stream); - - CUDF_EXPECTS(0 != build_device_table->num_columns(), "Selected build dataset is empty"); - CUDF_EXPECTS(0 != build_device_table->num_rows(), "Build side table has no rows"); - - size_type const build_table_num_rows{build_device_table->num_rows()}; - std::size_t const hash_table_size = compute_hash_table_size(build_table_num_rows); - - auto hash_table = multimap_type::create(hash_table_size, - stream, - true, - multimap_type::hasher(), - multimap_type::key_equal(), - multimap_type::allocator_type()); - - row_hash hash_build{*build_device_table}; - rmm::device_scalar failure(0, stream); - constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - detail::grid_1d config(build_table_num_rows, block_size); - auto const row_bitmask = (compare_nulls == null_equality::EQUAL) - ? rmm::device_buffer{0, stream} - : cudf::detail::bitmask_and(build, stream); - build_hash_table<<>>( - *hash_table, - hash_build, - build_table_num_rows, - static_cast(row_bitmask.data()), - failure.data()); - // Check error code from the kernel - if (failure.value(stream) == 1) { CUDF_FAIL("Hash Table insert failure."); } - - return hash_table; + auto build_table_ptr = cudf::table_device_view::create(build, stream); + + CUDF_EXPECTS(0 != build_table_ptr->num_columns(), "Selected build dataset is empty"); + CUDF_EXPECTS(0 != build_table_ptr->num_rows(), "Build side table has no rows"); + + row_hash hash_build{*build_table_ptr}; + auto const empty_key_sentinel = hash_table.get_empty_key_sentinel(); + make_pair_function pair_func{hash_build, empty_key_sentinel}; + + auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func); + + size_type const build_table_num_rows{build_table_ptr->num_rows()}; + if ((compare_nulls == null_equality::EQUAL) or (not nullable(build))) { + hash_table.insert(iter, iter + build_table_num_rows, stream.value()); + } else { + thrust::counting_iterator stencil(0); + auto const row_bitmask = cudf::detail::bitmask_and(build, stream); + row_is_valid pred{static_cast(row_bitmask.data())}; + + // insert valid rows + hash_table.insert_if(iter, iter + build_table_num_rows, stencil, pred, stream.value()); + } } /** @@ -132,46 +141,37 @@ probe_join_hash_table(cudf::table_device_view build_table, std::make_unique>(0, stream, mr)); } - rmm::device_scalar write_index(0, stream); - auto left_indices = std::make_unique>(join_size, stream, mr); auto right_indices = std::make_unique>(join_size, stream, mr); - constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - detail::grid_1d config(probe_table.num_rows(), block_size); + pair_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; row_hash hash_probe{probe_table}; - row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; - if constexpr (JoinKind == cudf::detail::join_kind::FULL_JOIN) { - probe_hash_table - <<>>( - hash_table, - build_table, - probe_table, - hash_probe, - equality, - left_indices->data(), - right_indices->data(), - write_index.data(), - join_size); - auto const actual_size = write_index.value(stream); - left_indices->resize(actual_size, stream); - right_indices->resize(actual_size, stream); + auto const empty_key_sentinel = hash_table.get_empty_key_sentinel(); + make_pair_function pair_func{hash_probe, empty_key_sentinel}; + + auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func); + + const cudf::size_type probe_table_num_rows = probe_table.num_rows(); + + auto out1_zip_begin = thrust::make_zip_iterator( + thrust::make_tuple(thrust::make_discard_iterator(), left_indices->begin())); + auto out2_zip_begin = thrust::make_zip_iterator( + thrust::make_tuple(thrust::make_discard_iterator(), right_indices->begin())); + + if constexpr (JoinKind == cudf::detail::join_kind::FULL_JOIN or + JoinKind == cudf::detail::join_kind::LEFT_JOIN) { + [[maybe_unused]] auto [out1_zip_end, out2_zip_end] = hash_table.pair_retrieve_outer( + iter, iter + probe_table_num_rows, out1_zip_begin, out2_zip_begin, equality, stream.value()); + + if constexpr (JoinKind == cudf::detail::join_kind::FULL_JOIN) { + auto const actual_size = out1_zip_end - out1_zip_begin; + left_indices->resize(actual_size, stream); + right_indices->resize(actual_size, stream); + } } else { - probe_hash_table - <<>>( - hash_table, - build_table, - probe_table, - hash_probe, - equality, - left_indices->data(), - right_indices->data(), - write_index.data(), - join_size); + hash_table.pair_retrieve( + iter, iter + probe_table_num_rows, out1_zip_begin, out2_zip_begin, equality, stream.value()); } return std::make_pair(std::move(left_indices), std::move(right_indices)); } @@ -209,24 +209,24 @@ std::size_t get_full_join_size(cudf::table_device_view build_table, auto left_indices = std::make_unique>(join_size, stream, mr); auto right_indices = std::make_unique>(join_size, stream, mr); - constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - detail::grid_1d config(probe_table.num_rows(), block_size); + pair_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; row_hash hash_probe{probe_table}; - row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; - probe_hash_table - <<>>(hash_table, - build_table, - probe_table, - hash_probe, - equality, - left_indices->data(), - right_indices->data(), - write_index.data(), - join_size); + auto const empty_key_sentinel = hash_table.get_empty_key_sentinel(); + make_pair_function pair_func{hash_probe, empty_key_sentinel}; + + auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func); + + const cudf::size_type probe_table_num_rows = probe_table.num_rows(); + + auto out1_zip_begin = thrust::make_zip_iterator( + thrust::make_tuple(thrust::make_discard_iterator(), left_indices->begin())); + auto out2_zip_begin = thrust::make_zip_iterator( + thrust::make_tuple(thrust::make_discard_iterator(), right_indices->begin())); + + hash_table.pair_retrieve_outer( + iter, iter + probe_table_num_rows, out1_zip_begin, out2_zip_begin, equality, stream.value()); + // Release intermediate memory allocation left_indices->resize(0, stream); @@ -286,7 +286,11 @@ hash_join::hash_join_impl::~hash_join_impl() = default; hash_join::hash_join_impl::hash_join_impl(cudf::table_view const& build, null_equality compare_nulls, rmm::cuda_stream_view stream) - : _hash_table(nullptr) + : _is_empty{build.num_rows() == 0}, + _hash_table{compute_hash_table_size(build.num_rows()), + std::numeric_limits::max(), + cudf::detail::JoinNoneValue, + stream.value()} { CUDF_FUNC_RANGE(); CUDF_EXPECTS(0 != build.num_columns(), "Hash join build table is empty"); @@ -298,9 +302,9 @@ hash_join::hash_join_impl::hash_join_impl(cudf::table_view const& build, build, {}, {}, structs::detail::column_nullability::FORCE); _build = _flattened_build_table; - if (0 == build.num_rows()) { return; } + if (_is_empty) { return; } - _hash_table = build_join_hash_table(_build, compare_nulls, stream); + build_join_hash_table(_build, _hash_table, compare_nulls, stream); } std::pair>, @@ -349,7 +353,7 @@ std::size_t hash_join::hash_join_impl::inner_join_size(cudf::table_view const& p CUDF_FUNC_RANGE(); // Return directly if build table is empty - if (_hash_table == nullptr) { return 0; } + if (_is_empty) { return 0; } auto flattened_probe = structs::detail::flatten_nested_columns( probe, {}, {}, structs::detail::column_nullability::FORCE); @@ -359,7 +363,7 @@ std::size_t hash_join::hash_join_impl::inner_join_size(cudf::table_view const& p auto flattened_probe_table_ptr = cudf::table_device_view::create(flattened_probe_table, stream); return cudf::detail::compute_join_output_size( - *build_table_ptr, *flattened_probe_table_ptr, *_hash_table, compare_nulls, stream); + *build_table_ptr, *flattened_probe_table_ptr, _hash_table, compare_nulls, stream); } std::size_t hash_join::hash_join_impl::left_join_size(cudf::table_view const& probe, @@ -369,7 +373,7 @@ std::size_t hash_join::hash_join_impl::left_join_size(cudf::table_view const& pr CUDF_FUNC_RANGE(); // Trivial left join case - exit early - if (_hash_table == nullptr) { return probe.num_rows(); } + if (_is_empty) { return probe.num_rows(); } auto flattened_probe = structs::detail::flatten_nested_columns( probe, {}, {}, structs::detail::column_nullability::FORCE); @@ -379,7 +383,7 @@ std::size_t hash_join::hash_join_impl::left_join_size(cudf::table_view const& pr auto flattened_probe_table_ptr = cudf::table_device_view::create(flattened_probe_table, stream); return cudf::detail::compute_join_output_size( - *build_table_ptr, *flattened_probe_table_ptr, *_hash_table, compare_nulls, stream); + *build_table_ptr, *flattened_probe_table_ptr, _hash_table, compare_nulls, stream); } std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const& probe, @@ -390,7 +394,7 @@ std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const& pr CUDF_FUNC_RANGE(); // Trivial left join case - exit early - if (_hash_table == nullptr) { return probe.num_rows(); } + if (_is_empty) { return probe.num_rows(); } auto flattened_probe = structs::detail::flatten_nested_columns( probe, {}, {}, structs::detail::column_nullability::FORCE); @@ -400,7 +404,7 @@ std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const& pr auto flattened_probe_table_ptr = cudf::table_device_view::create(flattened_probe_table, stream); return get_full_join_size( - *build_table_ptr, *flattened_probe_table_ptr, *_hash_table, compare_nulls, stream, mr); + *build_table_ptr, *flattened_probe_table_ptr, _hash_table, compare_nulls, stream, mr); } template @@ -449,19 +453,19 @@ hash_join::hash_join_impl::probe_join_indices(cudf::table_view const& probe, rmm::mr::device_memory_resource* mr) const { // Trivial left join case - exit early - if (_hash_table == nullptr and JoinKind != cudf::detail::join_kind::INNER_JOIN) { + if (_is_empty and JoinKind != cudf::detail::join_kind::INNER_JOIN) { return get_trivial_left_join_indices(probe, stream, mr); } - CUDF_EXPECTS(_hash_table, "Hash table of hash join is null."); + CUDF_EXPECTS(!_is_empty, "Hash table of hash join is null."); - auto build_table = cudf::table_device_view::create(_build, stream); - auto probe_table = cudf::table_device_view::create(probe, stream); + auto build_table_ptr = cudf::table_device_view::create(_build, stream); + auto probe_table_ptr = cudf::table_device_view::create(probe, stream); auto join_indices = cudf::detail::probe_join_hash_table( - *build_table, *probe_table, *_hash_table, compare_nulls, output_size, stream, mr); + *build_table_ptr, *probe_table_ptr, _hash_table, compare_nulls, output_size, stream, mr); - if (JoinKind == cudf::detail::join_kind::FULL_JOIN) { + if constexpr (JoinKind == cudf::detail::join_kind::FULL_JOIN) { auto complement_indices = detail::get_left_join_indices_complement( join_indices.second, probe.num_rows(), _build.num_rows(), stream, mr); join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream); diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index 1d39daed457..aa3d6a20d7f 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,11 +15,11 @@ */ #pragma once -#include #include #include -#include +#include +#include #include #include #include @@ -38,6 +38,42 @@ namespace cudf { namespace detail { + +/** + * @brief Remaps a hash value to a new value if it is equal to the specified sentinel value. + * + * @param hash The hash value to potentially remap + * @param sentinel The reserved value + */ +template +constexpr auto remap_sentinel_hash(H hash, S sentinel) +{ + // Arbitrarily choose hash - 1 + return (hash == sentinel) ? (hash - 1) : hash; +} + +/** + * @brief Device functor to create a pair of hash value and index for a given row. + */ +class make_pair_function { + public: + make_pair_function(row_hash const& hash, hash_value_type const empty_key_sentinel) + : _hash{hash}, _empty_key_sentinel{empty_key_sentinel} + { + } + + __device__ __forceinline__ cudf::detail::pair_type operator()(size_type i) const noexcept + { + // Compute the hash value of row `i` + auto row_hash_value = remap_sentinel_hash(_hash(i), _empty_key_sentinel); + return cuco::make_pair(std::move(row_hash_value), std::move(i)); + } + + private: + row_hash _hash; + hash_value_type const _empty_key_sentinel; +}; + /** * @brief Calculates the exact size of the join output produced when * joining two tables together. @@ -82,41 +118,24 @@ std::size_t compute_join_output_size(table_device_view build_table, } } - // Allocate storage for the counter used to get the size of the join output - std::size_t h_size{0}; - rmm::device_scalar d_size(0, stream); - - CHECK_CUDA(stream.value()); - - constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - int numBlocks{-1}; - - CUDA_TRY(cudaOccupancyMaxActiveBlocksPerMultiprocessor( - &numBlocks, compute_join_output_size, block_size, 0)); - - int dev_id{-1}; - CUDA_TRY(cudaGetDevice(&dev_id)); - - int num_sms{-1}; - CUDA_TRY(cudaDeviceGetAttribute(&num_sms, cudaDevAttrMultiProcessorCount, dev_id)); + pair_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; row_hash hash_probe{probe_table}; - row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; - // Probe the hash table without actually building the output to simply - // find what the size of the output will be. - compute_join_output_size - <<>>(hash_table, - build_table, - probe_table, - hash_probe, - equality, - probe_table_num_rows, - d_size.data()); - - CHECK_CUDA(stream.value()); - h_size = d_size.value(stream); - - return h_size; + auto const empty_key_sentinel = hash_table.get_empty_key_sentinel(); + make_pair_function pair_func{hash_probe, empty_key_sentinel}; + + auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func); + + size_type size; + if constexpr (JoinKind == join_kind::LEFT_JOIN) { + size = static_cast( + hash_table.pair_count_outer(iter, iter + probe_table_num_rows, equality, stream.value())); + } else { + size = static_cast( + hash_table.pair_count(iter, iter + probe_table_num_rows, equality, stream.value())); + } + + return size; } std::pair, std::unique_ptr
> get_empty_joined_table( @@ -137,11 +156,11 @@ struct hash_join::hash_join_impl { hash_join_impl& operator=(hash_join_impl&&) = delete; private: + bool _is_empty; cudf::table_view _build; std::vector> _created_null_columns; cudf::structs::detail::flattened_table _flattened_build_table; - std::unique_ptr> - _hash_table; + cudf::detail::multimap_type _hash_table; public: /** diff --git a/cpp/src/join/join_common_utils.cuh b/cpp/src/join/join_common_utils.cuh index d5c23b1d612..cec633765c7 100644 --- a/cpp/src/join/join_common_utils.cuh +++ b/cpp/src/join/join_common_utils.cuh @@ -27,6 +27,26 @@ namespace cudf { namespace detail { +/** + * @brief Device functor to determine if two pairs are identical. + */ +class pair_equality { + public: + pair_equality(table_device_view lhs, table_device_view rhs, bool nulls_are_equal = true) + : _check_row_equality{lhs, rhs, nulls_are_equal} + { + } + + __device__ __forceinline__ bool operator()(const pair_type& lhs, + const pair_type& rhs) const noexcept + { + return lhs.first == rhs.first and _check_row_equality(rhs.second, lhs.second); + } + + private: + row_equality _check_row_equality; +}; + /** * @brief Computes the trivial left join operation for the case when the * right table is empty. diff --git a/cpp/src/join/join_common_utils.hpp b/cpp/src/join/join_common_utils.hpp index d2541b006a7..d6eb5e93a98 100644 --- a/cpp/src/join/join_common_utils.hpp +++ b/cpp/src/join/join_common_utils.hpp @@ -21,25 +21,29 @@ #include +#include + #include namespace cudf { namespace detail { constexpr size_type MAX_JOIN_SIZE{std::numeric_limits::max()}; +constexpr int DEFAULT_JOIN_CG_SIZE = 2; constexpr int DEFAULT_JOIN_BLOCK_SIZE = 128; constexpr int DEFAULT_JOIN_CACHE_SIZE = 128; constexpr size_type JoinNoneValue = std::numeric_limits::min(); +using pair_type = cuco::pair_type; + +using hash_type = cuco::detail::MurmurHash3_32; + using multimap_type = - concurrent_unordered_multimap::max(), - std::numeric_limits::max(), - default_hash, - equal_to, - default_allocator>>; + cuco::static_multimap, + cuco::double_hashing>; using row_hash = cudf::row_hasher; diff --git a/cpp/src/join/join_kernels.cuh b/cpp/src/join/join_kernels.cuh deleted file mode 100644 index 62bc7d0fe80..00000000000 --- a/cpp/src/join/join_kernels.cuh +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Copyright (c) 2018-2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include - -#include -#include - -namespace cudf { -namespace detail { -/** - * @brief Remaps a hash value to a new value if it is equal to the specified sentinel value. - * - * @param hash The hash value to potentially remap - * @param sentinel The reserved value - */ -template -constexpr auto remap_sentinel_hash(H hash, S sentinel) -{ - // Arbitrarily choose hash - 1 - return (hash == sentinel) ? (hash - 1) : hash; -} - -/** - * @brief Builds a hash table from a row hasher that maps the hash - * values of each row to its respective row index. - * - * @tparam multimap_type The type of the hash table - * - * @param[in,out] multi_map The hash table to be built to insert rows into - * @param[in] hash_build Row hasher for the build table - * @param[in] build_table_num_rows The number of rows in the build table - * @param[in] row_bitmask Bitmask where bit `i` indicates the presence of a null - * value in row `i` of input keys. This is nullptr if nulls are equal. - * @param[out] error Pointer used to set an error code if the insert fails - */ -template -__global__ void build_hash_table(multimap_type multi_map, - row_hash hash_build, - const cudf::size_type build_table_num_rows, - bitmask_type const* row_bitmask, - int* error) -{ - cudf::size_type i = threadIdx.x + blockIdx.x * blockDim.x; - - while (i < build_table_num_rows) { - if (!row_bitmask || cudf::bit_is_set(row_bitmask, i)) { - // Compute the hash value of this row - auto const row_hash_value = remap_sentinel_hash(hash_build(i), multi_map.get_unused_key()); - - // Insert the (row hash value, row index) into the map - // using the row hash value to determine the location in the - // hash map where the new pair should be inserted - auto const insert_location = - multi_map.insert(thrust::make_pair(row_hash_value, i), true, row_hash_value); - - // If the insert failed, set the error code accordingly - if (multi_map.end() == insert_location) { *error = 1; } - } - i += blockDim.x * gridDim.x; - } -} - -/** - * @brief Computes the output size of joining the probe table to the build table - * by probing the hash map with the probe table and counting the number of matches. - * - * @tparam JoinKind The type of join to be performed - * @tparam multimap_type The datatype of the hash table - * @tparam block_size The number of threads per block for this kernel - * - * @param[in] multi_map The hash table built on the build table - * @param[in] build_table The build table - * @param[in] probe_table The probe table - * @param[in] hash_probe Row hasher for the probe table - * @param[in] check_row_equality The row equality comparator - * @param[in] probe_table_num_rows The number of rows in the probe table - * @param[out] output_size The resulting output size - */ -template -__global__ void compute_join_output_size(multimap_type multi_map, - table_device_view build_table, - table_device_view probe_table, - row_hash hash_probe, - row_equality check_row_equality, - const cudf::size_type probe_table_num_rows, - std::size_t* output_size) -{ - // This kernel probes multiple elements in the probe_table and store the number of matches found - // inside a register. A block reduction is used at the end to calculate the matches per thread - // block, and atomically add to the global 'output_size'. Compared to probing one element per - // thread, this implementation improves performance by reducing atomic adds to the shared memory - // counter. - - cudf::size_type thread_counter{0}; - const cudf::size_type start_idx = threadIdx.x + blockIdx.x * blockDim.x; - const cudf::size_type stride = blockDim.x * gridDim.x; - const auto unused_key = multi_map.get_unused_key(); - const auto end = multi_map.end(); - - for (cudf::size_type probe_row_index = start_idx; probe_row_index < probe_table_num_rows; - probe_row_index += stride) { - // Search the hash map for the hash value of the probe row using the row's - // hash value to determine the location where to search for the row in the hash map - auto const probe_row_hash_value = remap_sentinel_hash(hash_probe(probe_row_index), unused_key); - - auto found = multi_map.find(probe_row_hash_value, true, probe_row_hash_value); - - // for left-joins we always need to add an output - bool running = (JoinKind == join_kind::LEFT_JOIN) || (end != found); - bool found_match = false; - - while (running) { - // TODO Simplify this logic... - - // Left joins always have an entry in the output - if (JoinKind == join_kind::LEFT_JOIN && (end == found)) { - running = false; - } - // Stop searching after encountering an empty hash table entry - else if (unused_key == found->first) { - running = false; - } - // First check that the hash values of the two rows match - else if (found->first == probe_row_hash_value) { - // If the hash values are equal, check that the rows are equal - if (check_row_equality(probe_row_index, found->second)) { - // If the rows are equal, then we have found a true match - found_match = true; - ++thread_counter; - } - // Continue searching for matching rows until you hit an empty hash map entry - ++found; - // If you hit the end of the hash map, wrap around to the beginning - if (end == found) found = multi_map.begin(); - // Next entry is empty, stop searching - if (unused_key == found->first) running = false; - } else { - // Continue searching for matching rows until you hit an empty hash table entry - ++found; - // If you hit the end of the hash map, wrap around to the beginning - if (end == found) found = multi_map.begin(); - // Next entry is empty, stop searching - if (unused_key == found->first) running = false; - } - - if ((JoinKind == join_kind::LEFT_JOIN) && (!running) && (!found_match)) { ++thread_counter; } - } - } - - using BlockReduce = cub::BlockReduce; - __shared__ typename BlockReduce::TempStorage temp_storage; - std::size_t block_counter = BlockReduce(temp_storage).Sum(thread_counter); - - // Add block counter to global counter - if (threadIdx.x == 0) atomicAdd(output_size, block_counter); -} - -/** - * @brief Probes the hash map with the probe table to find all matching rows - * between the probe and hash table and generate the output for the desired - * Join operation. - * - * @tparam JoinKind The type of join to be performed - * @tparam multimap_type The type of the hash table - * @tparam block_size The number of threads per block for this kernel - * @tparam output_cache_size The side of the shared memory buffer to cache join output results - * - * @param[in] multi_map The hash table built from the build table - * @param[in] build_table The build table - * @param[in] probe_table The probe table - * @param[in] hash_probe Row hasher for the probe table - * @param[in] check_row_equality The row equality comparator - * @param[out] join_output_l The left result of the join operation - * @param[out] join_output_r The right result of the join operation - * @param[in,out] current_idx A global counter used by threads to coordinate writes to the global - output - * @param[in] max_size The maximum size of the output - */ -template -__global__ void probe_hash_table(multimap_type multi_map, - table_device_view build_table, - table_device_view probe_table, - row_hash hash_probe, - row_equality check_row_equality, - size_type* join_output_l, - size_type* join_output_r, - cudf::size_type* current_idx, - const std::size_t max_size) -{ - constexpr int num_warps = block_size / detail::warp_size; - __shared__ size_type current_idx_shared[num_warps]; - __shared__ size_type join_shared_l[num_warps][output_cache_size]; - __shared__ size_type join_shared_r[num_warps][output_cache_size]; - - const int warp_id = threadIdx.x / detail::warp_size; - const int lane_id = threadIdx.x % detail::warp_size; - const cudf::size_type probe_table_num_rows = probe_table.num_rows(); - - if (0 == lane_id) { current_idx_shared[warp_id] = 0; } - - __syncwarp(); - - size_type probe_row_index = threadIdx.x + blockIdx.x * blockDim.x; - - const unsigned int activemask = __ballot_sync(0xffffffff, probe_row_index < probe_table_num_rows); - if (probe_row_index < probe_table_num_rows) { - const auto unused_key = multi_map.get_unused_key(); - const auto end = multi_map.end(); - - // Search the hash map for the hash value of the probe row using the row's - // hash value to determine the location where to search for the row in the hash map - auto const probe_row_hash_value = remap_sentinel_hash(hash_probe(probe_row_index), unused_key); - - auto found = multi_map.find(probe_row_hash_value, true, probe_row_hash_value); - - bool running = (JoinKind == join_kind::LEFT_JOIN) || - (end != found); // for left-joins we always need to add an output - bool found_match = false; - while (__any_sync(activemask, running)) { - if (running) { - // TODO Simplify this logic... - - // Left joins always have an entry in the output - if ((JoinKind == join_kind::LEFT_JOIN) && (end == found)) { - running = false; - } - // Stop searching after encountering an empty hash table entry - else if (unused_key == found->first) { - running = false; - } - // First check that the hash values of the two rows match - else if (found->first == probe_row_hash_value) { - // If the hash values are equal, check that the rows are equal - // TODO : REMOVE : if(row_equal{probe_table, build_table}(probe_row_index, found->second)) - if (check_row_equality(probe_row_index, found->second)) { - // If the rows are equal, then we have found a true match - found_match = true; - add_pair_to_cache(probe_row_index, - found->second, - current_idx_shared, - warp_id, - join_shared_l[warp_id], - join_shared_r[warp_id]); - } - // Continue searching for matching rows until you hit an empty hash map entry - ++found; - // If you hit the end of the hash map, wrap around to the beginning - if (end == found) found = multi_map.begin(); - // Next entry is empty, stop searching - if (unused_key == found->first) running = false; - } else { - // Continue searching for matching rows until you hit an empty hash table entry - ++found; - // If you hit the end of the hash map, wrap around to the beginning - if (end == found) found = multi_map.begin(); - // Next entry is empty, stop searching - if (unused_key == found->first) running = false; - } - - // If performing a LEFT join and no match was found, insert a Null into the output - if ((JoinKind == join_kind::LEFT_JOIN) && (!running) && (!found_match)) { - add_pair_to_cache(probe_row_index, - static_cast(JoinNoneValue), - current_idx_shared, - warp_id, - join_shared_l[warp_id], - join_shared_r[warp_id]); - } - } - - __syncwarp(activemask); - // flush output cache if next iteration does not fit - if (current_idx_shared[warp_id] + detail::warp_size >= output_cache_size) { - flush_output_cache(activemask, - max_size, - warp_id, - lane_id, - current_idx, - current_idx_shared, - join_shared_l, - join_shared_r, - join_output_l, - join_output_r); - __syncwarp(activemask); - if (0 == lane_id) { current_idx_shared[warp_id] = 0; } - __syncwarp(activemask); - } - } - - // final flush of output cache - if (current_idx_shared[warp_id] > 0) { - flush_output_cache(activemask, - max_size, - warp_id, - lane_id, - current_idx, - current_idx_shared, - join_shared_l, - join_shared_r, - join_output_l, - join_output_r); - } - } -} - -} // namespace detail - -} // namespace cudf diff --git a/cpp/tests/join/join_tests.cpp b/cpp/tests/join/join_tests.cpp index 6a515efc3bb..d64b40c38b3 100644 --- a/cpp/tests/join/join_tests.cpp +++ b/cpp/tests/join/join_tests.cpp @@ -1437,21 +1437,25 @@ TEST_F(JoinDictionaryTest, LeftJoinNoNulls) auto t1 = cudf::table_view({col1_0, col1_1->view(), col1_2}); auto g0 = cudf::table_view({col0_0, col0_1_w, col0_2}); auto g1 = cudf::table_view({col1_0, col1_1_w, col1_2}); - { - auto result = cudf::left_join(t0, t1, {0}, {0}); - auto result_view = result->view(); - auto decoded1 = cudf::dictionary::decode(result_view.column(1)); - auto decoded4 = cudf::dictionary::decode(result_view.column(4)); - std::vector result_decoded({result_view.column(0), - decoded1->view(), - result_view.column(2), - result_view.column(3), - decoded4->view(), - result_view.column(5)}); - - auto gold = cudf::left_join(g0, g1, {0}, {0}); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*gold, cudf::table_view(result_decoded)); - } + + auto result = cudf::left_join(t0, t1, {0}, {0}); + auto result_view = result->view(); + auto decoded1 = cudf::dictionary::decode(result_view.column(1)); + auto decoded4 = cudf::dictionary::decode(result_view.column(4)); + std::vector result_decoded({result_view.column(0), + decoded1->view(), + result_view.column(2), + result_view.column(3), + decoded4->view(), + result_view.column(5)}); + auto result_sort_order = cudf::sorted_order(cudf::table_view(result_decoded)); + auto sorted_result = cudf::gather(cudf::table_view(result_decoded), *result_sort_order); + + auto gold = cudf::left_join(g0, g1, {0}, {0}); + auto gold_sort_order = cudf::sorted_order(gold->view()); + auto sorted_gold = cudf::gather(gold->view(), *gold_sort_order); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_gold, *sorted_result); } TEST_F(JoinDictionaryTest, LeftJoinWithNulls) @@ -1479,11 +1483,16 @@ TEST_F(JoinDictionaryTest, LeftJoinWithNulls) result_view.column(3), result_view.column(4), decoded5->view()}); + auto result_sort_order = cudf::sorted_order(cudf::table_view(result_decoded)); + auto sorted_result = cudf::gather(cudf::table_view(result_decoded), *result_sort_order); + + auto g0 = cudf::table_view({col0_0, col0_1, col0_2_w}); + auto g1 = cudf::table_view({col1_0, col1_1, col1_2_w}); + auto gold = cudf::left_join(g0, g1, {0, 1}, {0, 1}); + auto gold_sort_order = cudf::sorted_order(gold->view()); + auto sorted_gold = cudf::gather(gold->view(), *gold_sort_order); - auto g0 = cudf::table_view({col0_0, col0_1, col0_2_w}); - auto g1 = cudf::table_view({col1_0, col1_1, col1_2_w}); - auto gold = cudf::left_join(g0, g1, {0, 1}, {0, 1}); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*gold, cudf::table_view(result_decoded)); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_gold, *sorted_result); } TEST_F(JoinDictionaryTest, InnerJoinNoNulls) @@ -1511,11 +1520,16 @@ TEST_F(JoinDictionaryTest, InnerJoinNoNulls) result_view.column(3), decoded4->view(), result_view.column(5)}); + auto result_sort_order = cudf::sorted_order(cudf::table_view(result_decoded)); + auto sorted_result = cudf::gather(cudf::table_view(result_decoded), *result_sort_order); + + auto g0 = cudf::table_view({col0_0, col0_1_w, col0_2}); + auto g1 = cudf::table_view({col1_0, col1_1_w, col1_2}); + auto gold = cudf::inner_join(g0, g1, {0, 1}, {0, 1}); + auto gold_sort_order = cudf::sorted_order(gold->view()); + auto sorted_gold = cudf::gather(gold->view(), *gold_sort_order); - auto g0 = cudf::table_view({col0_0, col0_1_w, col0_2}); - auto g1 = cudf::table_view({col1_0, col1_1_w, col1_2}); - auto gold = cudf::inner_join(g0, g1, {0, 1}, {0, 1}); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*gold, cudf::table_view(result_decoded)); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_gold, *sorted_result); } TEST_F(JoinDictionaryTest, InnerJoinWithNulls) @@ -1543,11 +1557,16 @@ TEST_F(JoinDictionaryTest, InnerJoinWithNulls) result_view.column(3), result_view.column(4), decoded5->view()}); + auto result_sort_order = cudf::sorted_order(cudf::table_view(result_decoded)); + auto sorted_result = cudf::gather(cudf::table_view(result_decoded), *result_sort_order); - auto g0 = cudf::table_view({col0_0, col0_1, col0_2_w}); - auto g1 = cudf::table_view({col1_0, col1_1, col1_2_w}); - auto gold = cudf::inner_join(g0, g1, {0, 1}, {0, 1}); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*gold, cudf::table_view(result_decoded)); + auto g0 = cudf::table_view({col0_0, col0_1, col0_2_w}); + auto g1 = cudf::table_view({col1_0, col1_1, col1_2_w}); + auto gold = cudf::inner_join(g0, g1, {0, 1}, {0, 1}); + auto gold_sort_order = cudf::sorted_order(gold->view()); + auto sorted_gold = cudf::gather(gold->view(), *gold_sort_order); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_gold, *sorted_result); } TEST_F(JoinDictionaryTest, FullJoinNoNulls) @@ -1575,11 +1594,16 @@ TEST_F(JoinDictionaryTest, FullJoinNoNulls) result_view.column(3), decoded4->view(), result_view.column(5)}); + auto result_sort_order = cudf::sorted_order(cudf::table_view(result_decoded)); + auto sorted_result = cudf::gather(cudf::table_view(result_decoded), *result_sort_order); + + auto g0 = cudf::table_view({col0_0, col0_1_w, col0_2}); + auto g1 = cudf::table_view({col1_0, col1_1_w, col1_2}); + auto gold = cudf::full_join(g0, g1, {0, 1}, {0, 1}); + auto gold_sort_order = cudf::sorted_order(gold->view()); + auto sorted_gold = cudf::gather(gold->view(), *gold_sort_order); - auto g0 = cudf::table_view({col0_0, col0_1_w, col0_2}); - auto g1 = cudf::table_view({col1_0, col1_1_w, col1_2}); - auto gold = cudf::full_join(g0, g1, {0, 1}, {0, 1}); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*gold, cudf::table_view(result_decoded)); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_gold, *sorted_result); } TEST_F(JoinDictionaryTest, FullJoinWithNulls) @@ -1607,11 +1631,16 @@ TEST_F(JoinDictionaryTest, FullJoinWithNulls) decoded3->view(), result_view.column(4), result_view.column(5)}); + auto result_sort_order = cudf::sorted_order(cudf::table_view(result_decoded)); + auto sorted_result = cudf::gather(cudf::table_view(result_decoded), *result_sort_order); + + auto g0 = cudf::table_view({col0_0_w, col0_1, col0_2}); + auto g1 = cudf::table_view({col1_0_w, col1_1, col1_2}); + auto gold = cudf::full_join(g0, g1, {0, 1}, {0, 1}); + auto gold_sort_order = cudf::sorted_order(gold->view()); + auto sorted_gold = cudf::gather(gold->view(), *gold_sort_order); - auto g0 = cudf::table_view({col0_0_w, col0_1, col0_2}); - auto g1 = cudf::table_view({col1_0_w, col1_1, col1_2}); - auto gold = cudf::full_join(g0, g1, {0, 1}, {0, 1}); - CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*gold, cudf::table_view(result_decoded)); + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*sorted_gold, *sorted_result); } TEST_F(JoinTest, FullJoinWithStructsAndNulls) diff --git a/python/cudf/cudf/tests/test_indexing.py b/python/cudf/cudf/tests/test_indexing.py index e550c7c374e..90a20e2bab4 100644 --- a/python/cudf/cudf/tests/test_indexing.py +++ b/python/cudf/cudf/tests/test_indexing.py @@ -361,7 +361,10 @@ def test_dataframe_loc_duplicate_index_scalar(): pdf = pd.DataFrame({"a": [1, 2, 3, 4, 5]}, index=[1, 2, 1, 4, 2]) gdf = cudf.DataFrame.from_pandas(pdf) - assert_eq(pdf.loc[2], gdf.loc[2]) + pdf_sorted = pdf.sort_values(by=list(pdf.columns), axis=0) + gdf_sorted = gdf.sort_values(by=list(gdf.columns), axis=0) + + assert_eq(pdf_sorted, gdf_sorted) @pytest.mark.parametrize( diff --git a/python/cudf/cudf/tests/test_joining.py b/python/cudf/cudf/tests/test_joining.py index 775b866f5ce..88e822c27c4 100644 --- a/python/cudf/cudf/tests/test_joining.py +++ b/python/cudf/cudf/tests/test_joining.py @@ -178,8 +178,11 @@ def test_dataframe_join_suffix(): # Check assert list(expect.columns) == list(got.columns) assert_eq(expect.index.values, got.index.values) - for k in expect.columns: - _check_series(expect[k].fillna(-1), got[k].fillna(-1)) + + got_sorted = got.sort_values(by=list(got.columns), axis=0) + expect_sorted = expect.sort_values(by=list(expect.columns), axis=0) + for k in expect_sorted.columns: + _check_series(expect_sorted[k].fillna(-1), got_sorted[k].fillna(-1)) def test_dataframe_join_cats(): @@ -1356,7 +1359,9 @@ def test_categorical_typecast_inner(): expect_dtype = CategoricalDtype(categories=[1, 2, 3], ordered=False) expect_data = cudf.Series([1, 2, 3], dtype=expect_dtype, name="key") - assert_eq(expect_data, result["key"], check_categorical=False) + assert_join_results_equal( + expect_data, result["key"], how="inner", check_categorical=False + ) # Equal categories, unequal ordering -> error left = make_categorical_dataframe([1, 2, 3], ordered=False) @@ -1374,7 +1379,9 @@ def test_categorical_typecast_inner(): expect_dtype = cudf.CategoricalDtype(categories=[2, 3], ordered=False) expect_data = cudf.Series([2, 3], dtype=expect_dtype, name="key") - assert_eq(expect_data, result["key"], check_categorical=False) + assert_join_results_equal( + expect_data, result["key"], how="inner", check_categorical=False + ) # One is ordered -> error left = make_categorical_dataframe([1, 2, 3], ordered=False) @@ -1404,7 +1411,7 @@ def test_categorical_typecast_left(): expect_dtype = CategoricalDtype(categories=[1, 2, 3], ordered=False) expect_data = cudf.Series([1, 2, 3], dtype=expect_dtype, name="key") - assert_eq(expect_data, result["key"]) + assert_join_results_equal(expect_data, result["key"], how="left") # equal categories, unequal ordering -> error left = make_categorical_dataframe([1, 2, 3], ordered=True) @@ -1423,7 +1430,7 @@ def test_categorical_typecast_left(): expect_dtype = CategoricalDtype(categories=[1, 2, 3], ordered=False) expect_data = cudf.Series([1, 2, 3], dtype=expect_dtype, name="key") - assert_eq(expect_data, result["key"].sort_values().reset_index(drop=True)) + assert_join_results_equal(expect_data, result["key"], how="left") # unequal categories, unequal ordering -> error left = make_categorical_dataframe([1, 2, 3], ordered=True) @@ -1458,7 +1465,7 @@ def test_categorical_typecast_outer(): expect_dtype = CategoricalDtype(categories=[1, 2, 3], ordered=False) expect_data = cudf.Series([1, 2, 3], dtype=expect_dtype, name="key") - assert_eq(expect_data, result["key"]) + assert_join_results_equal(expect_data, result["key"], how="outer") # equal categories, both ordered -> common dtype left = make_categorical_dataframe([1, 2, 3], ordered=True) @@ -1468,7 +1475,7 @@ def test_categorical_typecast_outer(): expect_dtype = CategoricalDtype(categories=[1, 2, 3], ordered=True) expect_data = cudf.Series([1, 2, 3], dtype=expect_dtype, name="key") - assert_eq(expect_data, result["key"]) + assert_join_results_equal(expect_data, result["key"], how="outer") # equal categories, one ordered -> error left = make_categorical_dataframe([1, 2, 3], ordered=False) @@ -1487,7 +1494,7 @@ def test_categorical_typecast_outer(): expect_dtype = CategoricalDtype(categories=[1, 2, 3, 4], ordered=False) expect_data = cudf.Series([1, 2, 3, 4], dtype=expect_dtype, name="key") - assert_eq(expect_data, result["key"].sort_values().reset_index(drop=True)) + assert_join_results_equal(expect_data, result["key"], how="outer") # unequal categories, one ordered -> error left = make_categorical_dataframe([1, 2, 3], ordered=False) diff --git a/python/cudf/cudf/tests/test_multiindex.py b/python/cudf/cudf/tests/test_multiindex.py index 2ded4925964..d409a099806 100644 --- a/python/cudf/cudf/tests/test_multiindex.py +++ b/python/cudf/cudf/tests/test_multiindex.py @@ -299,7 +299,7 @@ def test_multiindex_loc(pdf, gdf, pdfIndex, key_tuple): assert_eq(pdfIndex, gdfIndex) pdf.index = pdfIndex gdf.index = gdfIndex - assert_eq(pdf.loc[key_tuple], gdf.loc[key_tuple]) + assert_eq(pdf.loc[key_tuple].sort_index(), gdf.loc[key_tuple].sort_index()) @pytest.mark.parametrize( @@ -966,26 +966,34 @@ def test_multiindex_rows_with_wildcard(pdf, gdf, pdfIndex): gdfIndex = cudf.from_pandas(pdfIndex) pdf.index = pdfIndex gdf.index = gdfIndex - assert_eq(pdf.loc[("a",), :], gdf.loc[("a",), :]) - assert_eq(pdf.loc[(("a"), ("store")), :], gdf.loc[(("a"), ("store")), :]) + assert_eq(pdf.loc[("a",), :].sort_index(), gdf.loc[("a",), :].sort_index()) assert_eq( - pdf.loc[(("a"), ("store"), ("storm")), :], - gdf.loc[(("a"), ("store"), ("storm")), :], + pdf.loc[(("a"), ("store")), :].sort_index(), + gdf.loc[(("a"), ("store")), :].sort_index(), ) assert_eq( - pdf.loc[(("a"), ("store"), ("storm"), ("smoke")), :], - gdf.loc[(("a"), ("store"), ("storm"), ("smoke")), :], + pdf.loc[(("a"), ("store"), ("storm")), :].sort_index(), + gdf.loc[(("a"), ("store"), ("storm")), :].sort_index(), ) assert_eq( - pdf.loc[(slice(None), "store"), :], gdf.loc[(slice(None), "store"), :] + pdf.loc[(("a"), ("store"), ("storm"), ("smoke")), :].sort_index(), + gdf.loc[(("a"), ("store"), ("storm"), ("smoke")), :].sort_index(), ) assert_eq( - pdf.loc[(slice(None), slice(None), "storm"), :], - gdf.loc[(slice(None), slice(None), "storm"), :], + pdf.loc[(slice(None), "store"), :].sort_index(), + gdf.loc[(slice(None), "store"), :].sort_index(), ) assert_eq( - pdf.loc[(slice(None), slice(None), slice(None), "smoke"), :], - gdf.loc[(slice(None), slice(None), slice(None), "smoke"), :], + pdf.loc[(slice(None), slice(None), "storm"), :].sort_index(), + gdf.loc[(slice(None), slice(None), "storm"), :].sort_index(), + ) + assert_eq( + pdf.loc[ + (slice(None), slice(None), slice(None), "smoke"), : + ].sort_index(), + gdf.loc[ + (slice(None), slice(None), slice(None), "smoke"), : + ].sort_index(), ) diff --git a/python/cudf/cudf/tests/test_replace.py b/python/cudf/cudf/tests/test_replace.py index 6543af36dd4..2e7936feeae 100644 --- a/python/cudf/cudf/tests/test_replace.py +++ b/python/cudf/cudf/tests/test_replace.py @@ -58,7 +58,10 @@ def test_series_replace_all(gsr, to_replace, value): actual = gsr.replace(to_replace=gd_to_replace, value=gd_value) expected = psr.replace(to_replace=pd_to_replace, value=pd_value) - assert_eq(expected, actual) + assert_eq( + expected.sort_values().reset_index(drop=True), + actual.sort_values().reset_index(drop=True), + ) def test_series_replace(): @@ -75,7 +78,10 @@ def test_series_replace(): psr4 = psr3.replace("one", "two") sr3 = cudf.from_pandas(psr3) sr4 = sr3.replace("one", "two") - assert_eq(psr4, sr4) + assert_eq( + psr4.sort_values().reset_index(drop=True), + sr4.sort_values().reset_index(drop=True), + ) psr5 = psr3.replace("one", "five") sr5 = sr3.replace("one", "five") @@ -226,7 +232,10 @@ def test_dataframe_replace(df, to_replace, value): expected = pdf.replace(to_replace=pd_to_replace, value=pd_value) actual = gdf.replace(to_replace=gd_to_replace, value=gd_value) - assert_eq(expected, actual) + expected_sorted = expected.sort_values(by=list(expected.columns), axis=0) + actual_sorted = actual.sort_values(by=list(actual.columns), axis=0) + + assert_eq(expected_sorted, actual_sorted) def test_dataframe_replace_with_nulls(): @@ -1336,4 +1345,7 @@ def test_series_replace_errors(): def test_replace_nulls(gsr, old, new, expected): actual = gsr.replace(old, new) - assert_eq(expected, actual) + assert_eq( + expected.sort_values().reset_index(drop=True), + actual.sort_values().reset_index(drop=True), + )