Skip to content

Commit

Permalink
Rework cudf::replace_nulls to use strings::detail::copy_if_else (#15286)
Browse files Browse the repository at this point in the history
Removes the specialized kernels for strings in `cudf::replace_nulls` and replaces them with a call to `cudf::strings::detail::copy_if_else` which is already enabled with offsetalator support and optimized for long strings.
This will also allow `cudf::replace_nulls` to use large strings with no further changes.
Also includes a `replace_nulls` benchmark for strings.

Authors:
  - David Wendt (https://github.com/davidwendt)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Bradley Dice (https://github.com/bdice)

URL: #15286
  • Loading branch information
davidwendt authored Apr 2, 2024
1 parent 08d86c9 commit 13a5c7b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 110 deletions.
3 changes: 2 additions & 1 deletion cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ ConfigureNVBench(
)

# ##################################################################################################
# * reduction benchmark ---------------------------------------------------------------------------
# * replace benchmark ---------------------------------------------------------------------------
ConfigureBench(REPLACE_BENCH replace/clamp.cpp replace/nans.cpp)
ConfigureNVBench(REPLACE_NVBENCH replace/nulls.cpp)

# ##################################################################################################
# * filling benchmark -----------------------------------------------------------------------------
Expand Down
59 changes: 59 additions & 0 deletions cpp/benchmarks/replace/nulls.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>

#include <cudf/column/column.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/replace.hpp>
#include <cudf/scalar/scalar_factories.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/table/table.hpp>
#include <cudf/types.hpp>

#include <nvbench/nvbench.cuh>

static void replace_nulls(nvbench::state& state)
{
auto const n_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const max_width = static_cast<int32_t>(state.get_int64("row_width"));

if (static_cast<std::size_t>(n_rows) * static_cast<std::size_t>(max_width) >=
static_cast<std::size_t>(std::numeric_limits<cudf::size_type>::max())) {
state.skip("Skip benchmarks greater than size_type limit");
}

data_profile const table_profile = data_profile_builder().distribution(
cudf::type_id::STRING, distribution_id::NORMAL, 0, max_width);

auto const input_table = create_random_table(
{cudf::type_id::STRING, cudf::type_id::STRING}, row_count{n_rows}, table_profile);
auto const input = input_table->view().column(0);
auto const repl = input_table->view().column(1);

state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
auto chars_size = cudf::strings_column_view(input).chars_size(cudf::get_default_stream());
state.add_global_memory_reads<nvbench::int8_t>(chars_size); // all bytes are read;
state.add_global_memory_writes<nvbench::int8_t>(chars_size);

state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { auto result = cudf::replace_nulls(input, repl); });
}

NVBENCH_BENCH(replace_nulls)
.set_name("replace_nulls")
.add_int64_axis("row_width", {32, 64, 128, 256, 512, 1024, 2048})
.add_int64_axis("num_rows", {32768, 262144, 2097152, 16777216});
127 changes: 18 additions & 109 deletions cpp/src/replace/nulls.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
#include <cudf/null_mask.hpp>
#include <cudf/replace.hpp>
#include <cudf/scalar/scalar.hpp>
#include <cudf/strings/detail/copy_if_else.cuh>
#include <cudf/strings/detail/replace.hpp>
#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/error.hpp>
Expand All @@ -56,63 +56,6 @@ namespace { // anonymous

static constexpr int BLOCK_SIZE = 256;

template <int phase, bool replacement_has_nulls>
CUDF_KERNEL void replace_nulls_strings(cudf::column_device_view input,
cudf::column_device_view replacement,
cudf::bitmask_type* output_valid,
cudf::size_type* offsets,
char* chars,
cudf::size_type* valid_counter)
{
cudf::size_type nrows = input.size();
auto i = cudf::detail::grid_1d::global_thread_id();
auto const stride = cudf::detail::grid_1d::grid_stride();

uint32_t active_mask = 0xffff'ffff;
active_mask = __ballot_sync(active_mask, i < nrows);
auto const lane_id{threadIdx.x % cudf::detail::warp_size};
uint32_t valid_sum{0};

while (i < nrows) {
bool input_is_valid = input.is_valid_nocheck(i);
bool output_is_valid = true;

if (replacement_has_nulls && !input_is_valid) {
output_is_valid = replacement.is_valid_nocheck(i);
}

cudf::string_view out;
if (input_is_valid) {
out = input.element<cudf::string_view>(i);
} else if (output_is_valid) {
out = replacement.element<cudf::string_view>(i);
}

bool nonzero_output = (input_is_valid || output_is_valid);

if (phase == 0) {
offsets[i] = nonzero_output ? out.size_bytes() : 0;
uint32_t bitmask = __ballot_sync(active_mask, output_is_valid);
if (0 == lane_id) {
output_valid[cudf::word_index(i)] = bitmask;
valid_sum += __popc(bitmask);
}
} else if (phase == 1) {
if (nonzero_output) std::memcpy(chars + offsets[i], out.data(), out.size_bytes());
}

i += stride;
active_mask = __ballot_sync(active_mask, i < nrows);
}

// Compute total valid count for this block and add it to global count
uint32_t block_valid_count = cudf::detail::single_lane_block_sum_reduce<BLOCK_SIZE, 0>(valid_sum);
// one thread computes and adds to output_valid_count
if (threadIdx.x == 0) {
atomicAdd(valid_counter, static_cast<cudf::size_type>(block_valid_count));
}
}

template <typename Type, bool replacement_has_nulls>
CUDF_KERNEL void replace_nulls(cudf::column_device_view input,
cudf::column_device_view replacement,
Expand Down Expand Up @@ -222,58 +165,24 @@ std::unique_ptr<cudf::column> replace_nulls_column_kernel_forwarder::operator()<
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
rmm::device_scalar<cudf::size_type> valid_counter(0, stream);
cudf::size_type* valid_count = valid_counter.data();

auto replace_first = replace_nulls_strings<0, false>;
auto replace_second = replace_nulls_strings<1, false>;
if (replacement.has_nulls()) {
replace_first = replace_nulls_strings<0, true>;
replace_second = replace_nulls_strings<1, true>;
auto d_input = cudf::column_device_view::create(input, stream);
auto d_replacement = cudf::column_device_view::create(replacement, stream);

auto lhs_iter =
cudf::detail::make_optional_iterator<cudf::string_view>(*d_input, cudf::nullate::YES{});
auto rhs_iter = cudf::detail::make_optional_iterator<cudf::string_view>(
*d_replacement, cudf::nullate::DYNAMIC{replacement.nullable()});

auto filter = cudf::detail::validity_accessor<false>{*d_input};
auto result = cudf::strings::detail::copy_if_else(
lhs_iter, lhs_iter + input.size(), rhs_iter, filter, stream, mr);

// input is nullable so result should always be nullable here
if (!result->nullable()) {
result->set_null_mask(
cudf::detail::create_null_mask(input.size(), cudf::mask_state::ALL_VALID, stream, mr), 0);
}

// Create new offsets column to use in kernel
std::unique_ptr<cudf::column> sizes = cudf::make_numeric_column(
cudf::data_type(cudf::type_id::INT32), input.size(), cudf::mask_state::UNALLOCATED, stream);

auto sizes_view = sizes->mutable_view();
auto device_in = cudf::column_device_view::create(input, stream);
auto device_replacement = cudf::column_device_view::create(replacement, stream);

rmm::device_buffer valid_bits =
cudf::detail::create_null_mask(input.size(), cudf::mask_state::UNINITIALIZED, stream, mr);

// Call first pass kernel to get sizes in offsets
cudf::detail::grid_1d grid{input.size(), BLOCK_SIZE, 1};
replace_first<<<grid.num_blocks, BLOCK_SIZE, 0, stream.value()>>>(
*device_in,
*device_replacement,
reinterpret_cast<cudf::bitmask_type*>(valid_bits.data()),
sizes_view.begin<cudf::size_type>(),
nullptr,
valid_count);

auto [offsets, bytes] = cudf::detail::make_offsets_child_column(
sizes_view.begin<int32_t>(), sizes_view.end<int32_t>(), stream, mr);

auto offsets_view = offsets->mutable_view();

// Allocate chars array and output null mask
rmm::device_uvector<char> output_chars(bytes, stream, mr);

replace_second<<<grid.num_blocks, BLOCK_SIZE, 0, stream.value()>>>(
*device_in,
*device_replacement,
reinterpret_cast<cudf::bitmask_type*>(valid_bits.data()),
offsets_view.begin<cudf::size_type>(),
output_chars.data(),
valid_count);

return cudf::make_strings_column(input.size(),
std::move(offsets),
output_chars.release(),
input.size() - valid_counter.value(stream),
std::move(valid_bits));
return result;
}

template <>
Expand Down

0 comments on commit 13a5c7b

Please sign in to comment.