Skip to content

Commit

Permalink
Merge branch 'branch-22.06' into cpp-2-reviewer-pull-req
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt committed May 3, 2022
2 parents 93b9642 + a9eb47c commit f6793a0
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 19 deletions.
8 changes: 5 additions & 3 deletions cpp/include/cudf/detail/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/mr/device/polymorphic_allocator.hpp>

Expand Down Expand Up @@ -68,9 +69,10 @@ struct hash_join {
hash_join& operator=(hash_join&&) = delete;

private:
bool const _is_empty; ///< true if `_hash_table` is empty
cudf::null_equality const _nulls_equal; ///< whether to consider nulls as equal
cudf::table_view _build; ///< input table to build the hash map
bool const _is_empty; ///< true if `_hash_table` is empty
rmm::device_buffer const _composite_bitmask; ///< Bitmask to denote whether a row is valid
cudf::null_equality const _nulls_equal; ///< whether to consider nulls as equal
cudf::table_view _build; ///< input table to build the hash map
cudf::structs::detail::flattened_table
_flattened_build_table; ///< flattened data structures for `_build`
map_type _hash_table; ///< hash table built on `_build`
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ hash_join<Hasher>::hash_join(cudf::table_view const& build,
cudf::null_equality compare_nulls,
rmm::cuda_stream_view stream)
: _is_empty{build.num_rows() == 0},
_composite_bitmask{cudf::detail::bitmask_and(build, stream).first},
_nulls_equal{compare_nulls},
_hash_table{compute_hash_table_size(build.num_rows()),
std::numeric_limits<hash_value_type>::max(),
Expand All @@ -302,7 +303,11 @@ hash_join<Hasher>::hash_join(cudf::table_view const& build,

if (_is_empty) { return; }

cudf::detail::build_join_hash_table(_build, _hash_table, _nulls_equal, stream);
cudf::detail::build_join_hash_table(_build,
_hash_table,
_nulls_equal,
static_cast<bitmask_type const*>(_composite_bitmask.data()),
stream);
}

template <typename Hasher>
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ get_trivial_left_join_indices(
* @param build Table of columns used to build join hash.
* @param hash_table Build hash table.
* @param nulls_equal Flag to denote nulls are equal or not.
* @param bitmask Bitmask to denote whether a row is valid.
* @param stream CUDA stream used for device memory operations and kernel launches.
*
*/
template <typename MultimapType>
void build_join_hash_table(cudf::table_view const& build,
MultimapType& hash_table,
null_equality const nulls_equal,
[[maybe_unused]] bitmask_type const* bitmask,
rmm::cuda_stream_view stream)
{
auto build_table_ptr = cudf::table_device_view::create(build, stream);
Expand All @@ -168,8 +170,7 @@ void build_join_hash_table(cudf::table_view const& build,
hash_table.insert(iter, iter + build_table_num_rows, stream.value());
} else {
thrust::counting_iterator<size_type> stencil(0);
auto const row_bitmask = cudf::detail::bitmask_and(build, stream).first;
row_is_valid pred{static_cast<bitmask_type const*>(row_bitmask.data())};
row_is_valid pred{bitmask};

// insert valid rows
hash_table.insert_if(iter, iter + build_table_num_rows, stencil, pred, stream.value());
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/join/mixed_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ mixed_join(
// TODO: To add support for nested columns we will need to flatten in many
// places. However, this probably isn't worth adding any time soon since we
// won't be able to support AST conditions for those types anyway.
build_join_hash_table(build, hash_table, compare_nulls, stream);
auto const row_bitmask = cudf::detail::bitmask_and(build, stream).first;
build_join_hash_table(
build, hash_table, compare_nulls, static_cast<bitmask_type const*>(row_bitmask.data()), stream);
auto hash_table_view = hash_table.get_device_view();

auto left_conditional_view = table_device_view::create(left_conditional, stream);
Expand Down Expand Up @@ -381,7 +383,9 @@ compute_mixed_join_output_size(table_view const& left_equality,
// TODO: To add support for nested columns we will need to flatten in many
// places. However, this probably isn't worth adding any time soon since we
// won't be able to support AST conditions for those types anyway.
build_join_hash_table(build, hash_table, compare_nulls, stream);
auto const row_bitmask = cudf::detail::bitmask_and(build, stream).first;
build_join_hash_table(
build, hash_table, compare_nulls, static_cast<bitmask_type const*>(row_bitmask.data()), stream);
auto hash_table_view = hash_table.get_device_view();

auto left_conditional_view = table_device_view::create(left_conditional, stream);
Expand Down
88 changes: 86 additions & 2 deletions cpp/src/strings/search/find.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/scalar/scalar_factories.hpp>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/strings/find.hpp>
Expand All @@ -28,6 +30,7 @@
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/binary_search.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

Expand Down Expand Up @@ -162,6 +165,81 @@ std::unique_ptr<column> rfind(strings_column_view const& strings,

namespace detail {
namespace {

/**
* @brief Threshold to decide on using string or warp parallel functions.
*
* If the average byte length of a string in a column exceeds this value then
* the warp-parallel `contains_warp_fn` function is used.
* Otherwise, the string-parallel function in `contains_fn` is used.
*
* This is only used for the scalar version of `contains()` right now.
*/
constexpr size_type AVG_CHAR_BYTES_THRESHOLD = 64;

/**
* @brief Check if `d_target` appears in a row in `d_strings`.
*
* This executes as a warp per string/row.
*/
struct contains_warp_fn {
column_device_view const d_strings;
string_view const d_target;
bool* d_results;

__device__ void operator()(std::size_t idx)
{
auto const str_idx = static_cast<size_type>(idx / cudf::detail::warp_size);
if (d_strings.is_null(str_idx)) { return; }
// get the string for this warp
auto const d_str = d_strings.element<string_view>(str_idx);
// each thread of the warp will check just part of the string
auto found = false;
for (auto i = static_cast<size_type>(idx % cudf::detail::warp_size);
!found && (i + d_target.size_bytes()) < d_str.size_bytes();
i += cudf::detail::warp_size) {
// check the target matches this part of the d_str data
if (d_target.compare(d_str.data() + i, d_target.size_bytes()) == 0) { found = true; }
}
if (found) { atomicOr(d_results + str_idx, true); }
}
};

std::unique_ptr<column> contains_warp_parallel(strings_column_view const& input,
string_scalar const& target,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(target.is_valid(stream), "Parameter target must be valid.");
auto d_target = string_view(target.data(), target.size());

// create output column
auto results = make_numeric_column(data_type{type_id::BOOL8},
input.size(),
cudf::detail::copy_bitmask(input.parent(), stream, mr),
input.null_count(),
stream,
mr);

// fill the output with `false` unless the `d_target` is empty
auto results_view = results->mutable_view();
thrust::fill(rmm::exec_policy(stream),
results_view.begin<bool>(),
results_view.end<bool>(),
d_target.empty());

if (!d_target.empty()) {
// launch warp per string
auto d_strings = column_device_view::create(input.parent(), stream);
thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_counting_iterator<std::size_t>(0),
static_cast<std::size_t>(input.size()) * cudf::detail::warp_size,
contains_warp_fn{*d_strings, d_target, results_view.data<bool>()});
}
results->set_null_count(input.null_count());
return results;
}

/**
* @brief Utility to return a bool column indicating the presence of
* a given target string in a strings column.
Expand Down Expand Up @@ -286,15 +364,21 @@ std::unique_ptr<column> contains_fn(strings_column_view const& strings,
} // namespace

std::unique_ptr<column> contains(
strings_column_view const& strings,
strings_column_view const& input,
string_scalar const& target,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
{
// use warp parallel when the average string width is greater than the threshold
if (!input.is_empty() && ((input.chars_size() / input.size()) > AVG_CHAR_BYTES_THRESHOLD)) {
return contains_warp_parallel(input, target, stream, mr);
}

// benchmark measurements showed this to be faster for smaller strings
auto pfn = [] __device__(string_view d_string, string_view d_target) {
return d_string.find(d_target) >= 0;
};
return contains_fn(strings, target, pfn, stream, mr);
return contains_fn(input, target, pfn, stream, mr);
}

std::unique_ptr<column> contains(
Expand Down
20 changes: 20 additions & 0 deletions cpp/tests/strings/find_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,26 @@ TEST_F(StringsFindTest, Contains)
}
}

TEST_F(StringsFindTest, ContainsLongStrings)
{
cudf::test::strings_column_wrapper strings(
{"Héllo, there world and goodbye",
"quick brown fox jumped over the lazy brown dog; the fat cats jump in place without moving",
"the following code snippet demonstrates how to use search for values in an ordered range",
"it returns the last position where value could be inserted without violating the ordering",
"algorithms execution is parallelized as determined by an execution policy. t",
"he this is a continuation of previous row to make sure string boundaries are honored",
""});
auto strings_view = cudf::strings_column_view(strings);
auto results = cudf::strings::contains(strings_view, cudf::string_scalar("e"));
cudf::test::fixed_width_column_wrapper<bool> expected({1, 1, 1, 1, 1, 1, 0});
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, expected);

results = cudf::strings::contains(strings_view, cudf::string_scalar(" the "));
cudf::test::fixed_width_column_wrapper<bool> expected2({0, 1, 0, 1, 0, 0, 0});
CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, expected2);
}

TEST_F(StringsFindTest, StartsWith)
{
cudf::test::strings_column_wrapper strings({"Héllo", "thesé", "", "lease", "tést strings", ""},
Expand Down
19 changes: 19 additions & 0 deletions python/cudf/cudf/core/reshape.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

import itertools
import warnings
from collections import abc
from typing import Dict, Optional

Expand Down Expand Up @@ -791,6 +792,24 @@ def merge_sorted(
A new, lexicographically sorted, DataFrame/Series.
"""

warnings.warn(
"merge_sorted is deprecated and will be removed in a "
"future release.",
FutureWarning,
)
return _merge_sorted(
objs, keys, by_index, ignore_index, ascending, na_position
)


def _merge_sorted(
objs,
keys=None,
by_index=False,
ignore_index=False,
ascending=True,
na_position="last",
):
if not pd.api.types.is_list_like(objs):
raise TypeError("objs must be a list-like of Frame-like objects")

Expand Down
17 changes: 9 additions & 8 deletions python/cudf/cudf/tests/test_reshape.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@pytest.mark.parametrize("nulls", ["none", "some", "all"])
def test_melt(nulls, num_id_vars, num_value_vars, num_rows, dtype):
if dtype not in ["float32", "float64"] and nulls in ["some", "all"]:
pytest.skip(msg="nulls not supported in dtype: " + dtype)
pytest.skip(reason="nulls not supported in dtype: " + dtype)

pdf = pd.DataFrame()
id_vars = []
Expand Down Expand Up @@ -87,7 +87,7 @@ def test_melt(nulls, num_id_vars, num_value_vars, num_rows, dtype):
@pytest.mark.parametrize("nulls", ["none", "some"])
def test_df_stack(nulls, num_cols, num_rows, dtype):
if dtype not in ["float32", "float64"] and nulls in ["some"]:
pytest.skip(msg="nulls not supported in dtype: " + dtype)
pytest.skip(reason="nulls not supported in dtype: " + dtype)

pdf = pd.DataFrame()
for i in range(num_cols):
Expand Down Expand Up @@ -139,7 +139,7 @@ def test_df_stack_reset_index():
def test_interleave_columns(nulls, num_cols, num_rows, dtype):

if dtype not in ["float32", "float64"] and nulls in ["some"]:
pytest.skip(msg="nulls not supported in dtype: " + dtype)
pytest.skip(reason="nulls not supported in dtype: " + dtype)

pdf = pd.DataFrame(dtype=dtype)
for i in range(num_cols):
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_interleave_columns(nulls, num_cols, num_rows, dtype):
def test_tile(nulls, num_cols, num_rows, dtype, count):

if dtype not in ["float32", "float64"] and nulls in ["some"]:
pytest.skip(msg="nulls not supported in dtype: " + dtype)
pytest.skip(reason="nulls not supported in dtype: " + dtype)

pdf = pd.DataFrame(dtype=dtype)
for i in range(num_cols):
Expand Down Expand Up @@ -269,7 +269,7 @@ def test_df_merge_sorted(nparts, keys, na_position, ascending):
expect = df.sort_values(
keys_1, na_position=na_position, ascending=ascending
)
result = cudf.merge_sorted(
result = cudf.core.reshape._merge_sorted(
dfs, keys=keys, na_position=na_position, ascending=ascending
)
if keys:
Expand All @@ -290,7 +290,8 @@ def test_df_merge_sorted_index(nparts, index, ascending):
)

expect = df.sort_index(ascending=ascending)
result = cudf.merge_sorted(dfs, by_index=True, ascending=ascending)
with pytest.warns(FutureWarning, match="deprecated and will be removed"):
result = cudf.merge_sorted(dfs, by_index=True, ascending=ascending)

assert_eq(expect.index, result.index)

Expand All @@ -317,7 +318,7 @@ def test_df_merge_sorted_ignore_index(keys, na_position, ascending):
expect = df.sort_values(
keys_1, na_position=na_position, ascending=ascending
)
result = cudf.merge_sorted(
result = cudf.core.reshape._merge_sorted(
dfs,
keys=keys,
na_position=na_position,
Expand Down Expand Up @@ -347,7 +348,7 @@ def test_series_merge_sorted(nparts, key, na_position, ascending):
)

expect = df.sort_values(na_position=na_position, ascending=ascending)
result = cudf.merge_sorted(
result = cudf.core.reshape._merge_sorted(
dfs, na_position=na_position, ascending=ascending
)

Expand Down
2 changes: 1 addition & 1 deletion python/dask_cudf/dask_cudf/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _append_counts(val, count):
return val

# Sort by calculated quantile values, then number of observations.
combined_vals_counts = gd.merge_sorted(
combined_vals_counts = gd.core.reshape._merge_sorted(
[*map(_append_counts, vals, counts)]
)
combined_counts = cupy.asnumpy(combined_vals_counts["_counts"].values)
Expand Down

0 comments on commit f6793a0

Please sign in to comment.