diff --git a/cpp/include/cudf/detail/join.hpp b/cpp/include/cudf/detail/join.hpp index 12e4aaa03fd..2a94ee22a0d 100644 --- a/cpp/include/cudf/detail/join.hpp +++ b/cpp/include/cudf/detail/join.hpp @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -68,9 +69,10 @@ struct hash_join { hash_join& operator=(hash_join&&) = delete; private: - bool const _is_empty; ///< true if `_hash_table` is empty - cudf::null_equality const _nulls_equal; ///< whether to consider nulls as equal - cudf::table_view _build; ///< input table to build the hash map + bool const _is_empty; ///< true if `_hash_table` is empty + rmm::device_buffer const _composite_bitmask; ///< Bitmask to denote whether a row is valid + cudf::null_equality const _nulls_equal; ///< whether to consider nulls as equal + cudf::table_view _build; ///< input table to build the hash map cudf::structs::detail::flattened_table _flattened_build_table; ///< flattened data structures for `_build` map_type _hash_table; ///< hash table built on `_build` diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 3e0e76de708..07995ba2785 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -283,6 +283,7 @@ hash_join::hash_join(cudf::table_view const& build, cudf::null_equality compare_nulls, rmm::cuda_stream_view stream) : _is_empty{build.num_rows() == 0}, + _composite_bitmask{cudf::detail::bitmask_and(build, stream).first}, _nulls_equal{compare_nulls}, _hash_table{compute_hash_table_size(build.num_rows()), std::numeric_limits::max(), @@ -302,7 +303,11 @@ hash_join::hash_join(cudf::table_view const& build, if (_is_empty) { return; } - cudf::detail::build_join_hash_table(_build, _hash_table, _nulls_equal, stream); + cudf::detail::build_join_hash_table(_build, + _hash_table, + _nulls_equal, + static_cast(_composite_bitmask.data()), + stream); } template diff --git a/cpp/src/join/join_common_utils.cuh b/cpp/src/join/join_common_utils.cuh index fdb63419c84..b3994685623 100644 --- a/cpp/src/join/join_common_utils.cuh +++ b/cpp/src/join/join_common_utils.cuh @@ -143,6 +143,7 @@ get_trivial_left_join_indices( * @param build Table of columns used to build join hash. * @param hash_table Build hash table. * @param nulls_equal Flag to denote nulls are equal or not. + * @param bitmask Bitmask to denote whether a row is valid. * @param stream CUDA stream used for device memory operations and kernel launches. * */ @@ -150,6 +151,7 @@ template void build_join_hash_table(cudf::table_view const& build, MultimapType& hash_table, null_equality const nulls_equal, + [[maybe_unused]] bitmask_type const* bitmask, rmm::cuda_stream_view stream) { auto build_table_ptr = cudf::table_device_view::create(build, stream); @@ -168,8 +170,7 @@ void build_join_hash_table(cudf::table_view const& build, hash_table.insert(iter, iter + build_table_num_rows, stream.value()); } else { thrust::counting_iterator stencil(0); - auto const row_bitmask = cudf::detail::bitmask_and(build, stream).first; - row_is_valid pred{static_cast(row_bitmask.data())}; + row_is_valid pred{bitmask}; // insert valid rows hash_table.insert_if(iter, iter + build_table_num_rows, stencil, pred, stream.value()); diff --git a/cpp/src/join/mixed_join.cu b/cpp/src/join/mixed_join.cu index 27ee77e3edd..11553858e5f 100644 --- a/cpp/src/join/mixed_join.cu +++ b/cpp/src/join/mixed_join.cu @@ -135,7 +135,9 @@ mixed_join( // TODO: To add support for nested columns we will need to flatten in many // places. However, this probably isn't worth adding any time soon since we // won't be able to support AST conditions for those types anyway. - build_join_hash_table(build, hash_table, compare_nulls, stream); + auto const row_bitmask = cudf::detail::bitmask_and(build, stream).first; + build_join_hash_table( + build, hash_table, compare_nulls, static_cast(row_bitmask.data()), stream); auto hash_table_view = hash_table.get_device_view(); auto left_conditional_view = table_device_view::create(left_conditional, stream); @@ -381,7 +383,9 @@ compute_mixed_join_output_size(table_view const& left_equality, // TODO: To add support for nested columns we will need to flatten in many // places. However, this probably isn't worth adding any time soon since we // won't be able to support AST conditions for those types anyway. - build_join_hash_table(build, hash_table, compare_nulls, stream); + auto const row_bitmask = cudf::detail::bitmask_and(build, stream).first; + build_join_hash_table( + build, hash_table, compare_nulls, static_cast(row_bitmask.data()), stream); auto hash_table_view = hash_table.get_device_view(); auto left_conditional_view = table_device_view::create(left_conditional, stream); diff --git a/cpp/src/strings/search/find.cu b/cpp/src/strings/search/find.cu index 15d89069ba3..1390b304e43 100644 --- a/cpp/src/strings/search/find.cu +++ b/cpp/src/strings/search/find.cu @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include #include #include @@ -28,6 +30,7 @@ #include #include +#include #include #include @@ -162,6 +165,81 @@ std::unique_ptr rfind(strings_column_view const& strings, namespace detail { namespace { + +/** + * @brief Threshold to decide on using string or warp parallel functions. + * + * If the average byte length of a string in a column exceeds this value then + * the warp-parallel `contains_warp_fn` function is used. + * Otherwise, the string-parallel function in `contains_fn` is used. + * + * This is only used for the scalar version of `contains()` right now. + */ +constexpr size_type AVG_CHAR_BYTES_THRESHOLD = 64; + +/** + * @brief Check if `d_target` appears in a row in `d_strings`. + * + * This executes as a warp per string/row. + */ +struct contains_warp_fn { + column_device_view const d_strings; + string_view const d_target; + bool* d_results; + + __device__ void operator()(std::size_t idx) + { + auto const str_idx = static_cast(idx / cudf::detail::warp_size); + if (d_strings.is_null(str_idx)) { return; } + // get the string for this warp + auto const d_str = d_strings.element(str_idx); + // each thread of the warp will check just part of the string + auto found = false; + for (auto i = static_cast(idx % cudf::detail::warp_size); + !found && (i + d_target.size_bytes()) < d_str.size_bytes(); + i += cudf::detail::warp_size) { + // check the target matches this part of the d_str data + if (d_target.compare(d_str.data() + i, d_target.size_bytes()) == 0) { found = true; } + } + if (found) { atomicOr(d_results + str_idx, true); } + } +}; + +std::unique_ptr contains_warp_parallel(strings_column_view const& input, + string_scalar const& target, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_EXPECTS(target.is_valid(stream), "Parameter target must be valid."); + auto d_target = string_view(target.data(), target.size()); + + // create output column + auto results = make_numeric_column(data_type{type_id::BOOL8}, + input.size(), + cudf::detail::copy_bitmask(input.parent(), stream, mr), + input.null_count(), + stream, + mr); + + // fill the output with `false` unless the `d_target` is empty + auto results_view = results->mutable_view(); + thrust::fill(rmm::exec_policy(stream), + results_view.begin(), + results_view.end(), + d_target.empty()); + + if (!d_target.empty()) { + // launch warp per string + auto d_strings = column_device_view::create(input.parent(), stream); + thrust::for_each_n(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + static_cast(input.size()) * cudf::detail::warp_size, + contains_warp_fn{*d_strings, d_target, results_view.data()}); + } + results->set_null_count(input.null_count()); + return results; +} + /** * @brief Utility to return a bool column indicating the presence of * a given target string in a strings column. @@ -286,15 +364,21 @@ std::unique_ptr contains_fn(strings_column_view const& strings, } // namespace std::unique_ptr contains( - strings_column_view const& strings, + strings_column_view const& input, string_scalar const& target, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) { + // use warp parallel when the average string width is greater than the threshold + if (!input.is_empty() && ((input.chars_size() / input.size()) > AVG_CHAR_BYTES_THRESHOLD)) { + return contains_warp_parallel(input, target, stream, mr); + } + + // benchmark measurements showed this to be faster for smaller strings auto pfn = [] __device__(string_view d_string, string_view d_target) { return d_string.find(d_target) >= 0; }; - return contains_fn(strings, target, pfn, stream, mr); + return contains_fn(input, target, pfn, stream, mr); } std::unique_ptr contains( diff --git a/cpp/tests/strings/find_tests.cpp b/cpp/tests/strings/find_tests.cpp index 177e6d97f7f..208063adcb0 100644 --- a/cpp/tests/strings/find_tests.cpp +++ b/cpp/tests/strings/find_tests.cpp @@ -82,6 +82,26 @@ TEST_F(StringsFindTest, Contains) } } +TEST_F(StringsFindTest, ContainsLongStrings) +{ + cudf::test::strings_column_wrapper strings( + {"Héllo, there world and goodbye", + "quick brown fox jumped over the lazy brown dog; the fat cats jump in place without moving", + "the following code snippet demonstrates how to use search for values in an ordered range", + "it returns the last position where value could be inserted without violating the ordering", + "algorithms execution is parallelized as determined by an execution policy. t", + "he this is a continuation of previous row to make sure string boundaries are honored", + ""}); + auto strings_view = cudf::strings_column_view(strings); + auto results = cudf::strings::contains(strings_view, cudf::string_scalar("e")); + cudf::test::fixed_width_column_wrapper expected({1, 1, 1, 1, 1, 1, 0}); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, expected); + + results = cudf::strings::contains(strings_view, cudf::string_scalar(" the ")); + cudf::test::fixed_width_column_wrapper expected2({0, 1, 0, 1, 0, 0, 0}); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, expected2); +} + TEST_F(StringsFindTest, StartsWith) { cudf::test::strings_column_wrapper strings({"Héllo", "thesé", "", "lease", "tést strings", ""}, diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index 5977b63777f..b405c018983 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -1,6 +1,7 @@ # Copyright (c) 2018-2022, NVIDIA CORPORATION. import itertools +import warnings from collections import abc from typing import Dict, Optional @@ -791,6 +792,24 @@ def merge_sorted( A new, lexicographically sorted, DataFrame/Series. """ + warnings.warn( + "merge_sorted is deprecated and will be removed in a " + "future release.", + FutureWarning, + ) + return _merge_sorted( + objs, keys, by_index, ignore_index, ascending, na_position + ) + + +def _merge_sorted( + objs, + keys=None, + by_index=False, + ignore_index=False, + ascending=True, + na_position="last", +): if not pd.api.types.is_list_like(objs): raise TypeError("objs must be a list-like of Frame-like objects") diff --git a/python/cudf/cudf/tests/test_reshape.py b/python/cudf/cudf/tests/test_reshape.py index 14fa4be7fed..5f40de74a65 100644 --- a/python/cudf/cudf/tests/test_reshape.py +++ b/python/cudf/cudf/tests/test_reshape.py @@ -24,7 +24,7 @@ @pytest.mark.parametrize("nulls", ["none", "some", "all"]) def test_melt(nulls, num_id_vars, num_value_vars, num_rows, dtype): if dtype not in ["float32", "float64"] and nulls in ["some", "all"]: - pytest.skip(msg="nulls not supported in dtype: " + dtype) + pytest.skip(reason="nulls not supported in dtype: " + dtype) pdf = pd.DataFrame() id_vars = [] @@ -87,7 +87,7 @@ def test_melt(nulls, num_id_vars, num_value_vars, num_rows, dtype): @pytest.mark.parametrize("nulls", ["none", "some"]) def test_df_stack(nulls, num_cols, num_rows, dtype): if dtype not in ["float32", "float64"] and nulls in ["some"]: - pytest.skip(msg="nulls not supported in dtype: " + dtype) + pytest.skip(reason="nulls not supported in dtype: " + dtype) pdf = pd.DataFrame() for i in range(num_cols): @@ -139,7 +139,7 @@ def test_df_stack_reset_index(): def test_interleave_columns(nulls, num_cols, num_rows, dtype): if dtype not in ["float32", "float64"] and nulls in ["some"]: - pytest.skip(msg="nulls not supported in dtype: " + dtype) + pytest.skip(reason="nulls not supported in dtype: " + dtype) pdf = pd.DataFrame(dtype=dtype) for i in range(num_cols): @@ -176,7 +176,7 @@ def test_interleave_columns(nulls, num_cols, num_rows, dtype): def test_tile(nulls, num_cols, num_rows, dtype, count): if dtype not in ["float32", "float64"] and nulls in ["some"]: - pytest.skip(msg="nulls not supported in dtype: " + dtype) + pytest.skip(reason="nulls not supported in dtype: " + dtype) pdf = pd.DataFrame(dtype=dtype) for i in range(num_cols): @@ -269,7 +269,7 @@ def test_df_merge_sorted(nparts, keys, na_position, ascending): expect = df.sort_values( keys_1, na_position=na_position, ascending=ascending ) - result = cudf.merge_sorted( + result = cudf.core.reshape._merge_sorted( dfs, keys=keys, na_position=na_position, ascending=ascending ) if keys: @@ -290,7 +290,8 @@ def test_df_merge_sorted_index(nparts, index, ascending): ) expect = df.sort_index(ascending=ascending) - result = cudf.merge_sorted(dfs, by_index=True, ascending=ascending) + with pytest.warns(FutureWarning, match="deprecated and will be removed"): + result = cudf.merge_sorted(dfs, by_index=True, ascending=ascending) assert_eq(expect.index, result.index) @@ -317,7 +318,7 @@ def test_df_merge_sorted_ignore_index(keys, na_position, ascending): expect = df.sort_values( keys_1, na_position=na_position, ascending=ascending ) - result = cudf.merge_sorted( + result = cudf.core.reshape._merge_sorted( dfs, keys=keys, na_position=na_position, @@ -347,7 +348,7 @@ def test_series_merge_sorted(nparts, key, na_position, ascending): ) expect = df.sort_values(na_position=na_position, ascending=ascending) - result = cudf.merge_sorted( + result = cudf.core.reshape._merge_sorted( dfs, na_position=na_position, ascending=ascending ) diff --git a/python/dask_cudf/dask_cudf/sorting.py b/python/dask_cudf/dask_cudf/sorting.py index 880e2365fe6..1c89baba592 100644 --- a/python/dask_cudf/dask_cudf/sorting.py +++ b/python/dask_cudf/dask_cudf/sorting.py @@ -85,7 +85,7 @@ def _append_counts(val, count): return val # Sort by calculated quantile values, then number of observations. - combined_vals_counts = gd.merge_sorted( + combined_vals_counts = gd.core.reshape._merge_sorted( [*map(_append_counts, vals, counts)] ) combined_counts = cupy.asnumpy(combined_vals_counts["_counts"].values)