Skip to content

Commit

Permalink
Expose streams in public replace APIs (#14010)
Browse files Browse the repository at this point in the history
Contributes to #925

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - MithunR (https://github.com/mythrocks)
  - David Wendt (https://github.com/davidwendt)

URL: #14010
  • Loading branch information
vyasr authored Sep 1, 2023
1 parent d1fb671 commit 2b7294b
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 45 deletions.
25 changes: 23 additions & 2 deletions cpp/include/cudf/replace.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
#pragma once

#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <rmm/mr/device/per_device_resource.hpp>

Expand Down Expand Up @@ -45,6 +46,7 @@ enum class replace_policy : bool { PRECEDING, FOLLOWING };
*
* @param[in] input A column whose null values will be replaced
* @param[in] replacement A cudf::column whose values will replace null values in input
* @param stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr Device memory resource used to allocate device memory of the returned column
*
* @returns A copy of `input` with the null values replaced with corresponding values from
Expand All @@ -53,6 +55,7 @@ enum class replace_policy : bool { PRECEDING, FOLLOWING };
std::unique_ptr<column> replace_nulls(
column_view const& input,
column_view const& replacement,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -63,13 +66,15 @@ std::unique_ptr<column> replace_nulls(
*
* @param[in] input A column whose null values will be replaced
* @param[in] replacement Scalar used to replace null values in `input`
* @param stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr Device memory resource used to allocate device memory of the returned column
*
* @returns Copy of `input` with null values replaced by `replacement`
*/
std::unique_ptr<column> replace_nulls(
column_view const& input,
scalar const& replacement,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -80,13 +85,15 @@ std::unique_ptr<column> replace_nulls(
*
* @param[in] input A column whose null values will be replaced
* @param[in] replace_policy Specify the position of replacement values relative to null values
* @param stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr Device memory resource used to allocate device memory of the returned column
*
* @returns Copy of `input` with null values replaced based on `replace_policy`
*/
std::unique_ptr<column> replace_nulls(
column_view const& input,
replace_policy const& replace_policy,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -106,13 +113,15 @@ std::unique_ptr<column> replace_nulls(
*
* @param input A column whose NaN values will be replaced
* @param replacement A cudf::column whose values will replace NaN values in input
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return A copy of `input` with the NaN values replaced with corresponding values from
* `replacement`.
*/
std::unique_ptr<column> replace_nans(
column_view const& input,
column_view const& replacement,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -132,12 +141,14 @@ std::unique_ptr<column> replace_nans(
*
* @param input A column whose NaN values will be replaced
* @param replacement A cudf::scalar whose value will replace NaN values in input
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return A copy of `input` with the NaN values replaced by `replacement`
*/
std::unique_ptr<column> replace_nans(
column_view const& input,
scalar const& replacement,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -147,6 +158,7 @@ std::unique_ptr<column> replace_nans(
* @param input_col The column to find and replace values in
* @param values_to_replace The values to replace
* @param replacement_values The values to replace with
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
*
* @returns Copy of `input_col` with specified values replaced
Expand All @@ -155,6 +167,7 @@ std::unique_ptr<column> find_and_replace_all(
column_view const& input_col,
column_view const& values_to_replace,
column_view const& replacement_values,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -198,6 +211,7 @@ std::unique_ptr<column> find_and_replace_all(
* @param[in] hi Maximum clamp value. All elements greater than `hi` will be replaced by
* `hi_replace`. Ignored if null.
* @param[in] hi_replace All elements greater than `hi` will be replaced by `hi_replace`
* @param stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr Device memory resource used to allocate device memory of the returned column
*
* @return Returns a clamped column as per `lo` and `hi` boundaries
Expand All @@ -208,6 +222,7 @@ std::unique_ptr<column> clamp(
scalar const& lo_replace,
scalar const& hi,
scalar const& hi_replace,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -244,6 +259,7 @@ std::unique_ptr<column> clamp(
* if null.
* @param[in] hi Maximum clamp value. All elements greater than `hi` will be replaced by `hi`
* Ignored if null.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr Device memory resource used to allocate device memory of the returned column
*
* @return Returns a clamped column as per `lo` and `hi` boundaries
Expand All @@ -252,6 +268,7 @@ std::unique_ptr<column> clamp(
column_view const& input,
scalar const& lo,
scalar const& hi,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -264,12 +281,14 @@ std::unique_ptr<column> clamp(
*
* @throws cudf::logic_error if column does not have floating point data type.
* @param[in] input column_view of floating-point elements to copy and normalize
* @param stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr device_memory_resource allocator for allocating output data
*
* @returns new column with the modified data
*/
std::unique_ptr<column> normalize_nans_and_zeros(
column_view const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -282,8 +301,10 @@ std::unique_ptr<column> normalize_nans_and_zeros(
*
* @throws cudf::logic_error if column does not have floating point data type.
* @param[in, out] in_out of floating-point elements to normalize
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void normalize_nans_and_zeros(mutable_column_view& in_out);
void normalize_nans_and_zeros(mutable_column_view& in_out,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/** @} */ // end of group
} // namespace cudf
6 changes: 4 additions & 2 deletions cpp/src/replace/clamp.cu
Original file line number Diff line number Diff line change
Expand Up @@ -386,19 +386,21 @@ std::unique_ptr<column> clamp(column_view const& input,
scalar const& lo_replace,
scalar const& hi,
scalar const& hi_replace,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::clamp(input, lo, lo_replace, hi, hi_replace, cudf::get_default_stream(), mr);
return detail::clamp(input, lo, lo_replace, hi, hi_replace, stream, mr);
}

// clamp input at lo and hi
std::unique_ptr<column> clamp(column_view const& input,
scalar const& lo,
scalar const& hi,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::clamp(input, lo, lo, hi, hi, cudf::get_default_stream(), mr);
return detail::clamp(input, lo, lo, hi, hi, stream, mr);
}
} // namespace cudf
15 changes: 9 additions & 6 deletions cpp/src/replace/nans.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -111,18 +111,20 @@ std::unique_ptr<column> replace_nans(column_view const& input,

std::unique_ptr<column> replace_nans(column_view const& input,
column_view const& replacement,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::replace_nans(input, replacement, cudf::get_default_stream(), mr);
return detail::replace_nans(input, replacement, stream, mr);
}

std::unique_ptr<column> replace_nans(column_view const& input,
scalar const& replacement,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::replace_nans(input, replacement, cudf::get_default_stream(), mr);
return detail::replace_nans(input, replacement, stream, mr);
}

} // namespace cudf
Expand Down Expand Up @@ -202,7 +204,7 @@ std::unique_ptr<column> normalize_nans_and_zeros(column_view const& input,

// from device. unique_ptr which gets automatically cleaned up when we leave.
auto out_view = out->mutable_view();
normalize_nans_and_zeros(out_view, stream);
detail::normalize_nans_and_zeros(out_view, stream);
out->set_null_count(input.null_count());

return out;
Expand All @@ -221,10 +223,11 @@ std::unique_ptr<column> normalize_nans_and_zeros(column_view const& input,
* @param mr Device memory resource used to allocate the returned column's device memory.
*/
std::unique_ptr<column> normalize_nans_and_zeros(column_view const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::normalize_nans_and_zeros(input, cudf::get_default_stream(), mr);
return detail::normalize_nans_and_zeros(input, stream, mr);
}

/**
Expand All @@ -237,7 +240,7 @@ std::unique_ptr<column> normalize_nans_and_zeros(column_view const& input,
* @throws cudf::logic_error if column does not have floating point data type.
* @param[in, out] in_out mutable_column_view representing input data. data is processed in-place
*/
void normalize_nans_and_zeros(mutable_column_view& in_out)
void normalize_nans_and_zeros(mutable_column_view& in_out, rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
detail::normalize_nans_and_zeros(in_out, cudf::get_default_stream());
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/replace/nulls.cu
Original file line number Diff line number Diff line change
Expand Up @@ -446,26 +446,29 @@ std::unique_ptr<cudf::column> replace_nulls(cudf::column_view const& input,

std::unique_ptr<cudf::column> replace_nulls(cudf::column_view const& input,
cudf::column_view const& replacement,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::replace_nulls(input, replacement, cudf::get_default_stream(), mr);
return detail::replace_nulls(input, replacement, stream, mr);
}

std::unique_ptr<cudf::column> replace_nulls(cudf::column_view const& input,
cudf::scalar const& replacement,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::replace_nulls(input, replacement, cudf::get_default_stream(), mr);
return detail::replace_nulls(input, replacement, stream, mr);
}

std::unique_ptr<cudf::column> replace_nulls(column_view const& input,
replace_policy const& replace_policy,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::replace_nulls(input, replace_policy, cudf::get_default_stream(), mr);
return detail::replace_nulls(input, replace_policy, stream, mr);
}

} // namespace cudf
4 changes: 2 additions & 2 deletions cpp/src/replace/replace.cu
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,9 @@ std::unique_ptr<cudf::column> find_and_replace_all(cudf::column_view const& inpu
std::unique_ptr<cudf::column> find_and_replace_all(cudf::column_view const& input_col,
cudf::column_view const& values_to_replace,
cudf::column_view const& replacement_values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return detail::find_and_replace_all(
input_col, values_to_replace, replacement_values, cudf::get_default_stream(), mr);
return detail::find_and_replace_all(input_col, values_to_replace, replacement_values, stream, mr);
}
} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ ConfigureTest(STREAM_COPYING_TEST streams/copying_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_GROUPBY_TEST streams/groupby_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_CONCATENATE_TEST streams/concatenate_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_FILLING_TEST streams/filling_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)

# ##################################################################################################
# Install tests ####################################################################################
Expand Down
23 changes: 10 additions & 13 deletions cpp/tests/replace/replace_nulls_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ TEST_F(ReplaceErrorTest, SizeMismatch)
{0, 0, 1, 1, 1, 1, 1, 1}};
cudf::test::fixed_width_column_wrapper<int32_t> values_to_replace_column{{10, 11, 12, 13}};

ASSERT_THROW(cudf::replace_nulls(input_column, values_to_replace_column, mr()),
cudf::logic_error);
ASSERT_THROW(cudf::replace_nulls(input_column, values_to_replace_column), cudf::logic_error);
}

// Error: column type mismatch
Expand All @@ -58,8 +57,7 @@ TEST_F(ReplaceErrorTest, TypeMismatch)
cudf::test::fixed_width_column_wrapper<float> values_to_replace_column{
{10, 11, 12, 13, 14, 15, 16, 17}};

EXPECT_THROW(cudf::replace_nulls(input_column, values_to_replace_column, mr()),
cudf::logic_error);
EXPECT_THROW(cudf::replace_nulls(input_column, values_to_replace_column), cudf::logic_error);
}

// Error: column type mismatch
Expand All @@ -69,7 +67,7 @@ TEST_F(ReplaceErrorTest, TypeMismatchScalar)
{0, 0, 1, 1, 1, 1, 1, 1}};
cudf::numeric_scalar<float> replacement(1);

EXPECT_THROW(cudf::replace_nulls(input_column, replacement, mr()), cudf::logic_error);
EXPECT_THROW(cudf::replace_nulls(input_column, replacement), cudf::logic_error);
}

struct ReplaceNullsStringsTest : public cudf::test::BaseFixture {};
Expand All @@ -88,7 +86,7 @@ TEST_F(ReplaceNullsStringsTest, SimpleReplace)
replacement.begin(), replacement.end(), replacement_v.begin()};

std::unique_ptr<cudf::column> result;
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w, mr()));
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w));

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expected_w);
}
Expand All @@ -107,7 +105,7 @@ TEST_F(ReplaceNullsStringsTest, ReplaceWithNulls)
replacement.begin(), replacement.end(), replacement_v.begin()};

std::unique_ptr<cudf::column> result;
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w, mr()));
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w));

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expected_w);
}
Expand All @@ -125,7 +123,7 @@ TEST_F(ReplaceNullsStringsTest, ReplaceWithAllNulls)
cudf::test::strings_column_wrapper expected_w{input.begin(), input.end(), input_v.begin()};

std::unique_ptr<cudf::column> result;
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w, mr()));
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w));

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expected_w);
}
Expand All @@ -143,7 +141,7 @@ TEST_F(ReplaceNullsStringsTest, ReplaceWithAllEmpty)
cudf::test::strings_column_wrapper expected_w{input.begin(), input.end(), replacement_v.begin()};

std::unique_ptr<cudf::column> result;
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w, mr()));
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w));

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expected_w);
}
Expand All @@ -161,7 +159,7 @@ TEST_F(ReplaceNullsStringsTest, ReplaceNone)
cudf::test::strings_column_wrapper expected_w{input.begin(), input.end()};

std::unique_ptr<cudf::column> result;
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w, mr()));
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, replacement_w));

CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*result, expected_w);
}
Expand All @@ -170,16 +168,15 @@ TEST_F(ReplaceNullsStringsTest, SimpleReplaceScalar)
{
std::vector<std::string> input{"", "", "", "", "", "", "", ""};
std::vector<cudf::valid_type> input_v{0, 0, 0, 0, 0, 0, 0, 0};
std::unique_ptr<cudf::scalar> repl =
cudf::make_string_scalar("rep", cudf::get_default_stream(), mr());
std::unique_ptr<cudf::scalar> repl = cudf::make_string_scalar("rep");
repl->set_valid_async(true, cudf::get_default_stream());
std::vector<std::string> expected{"rep", "rep", "rep", "rep", "rep", "rep", "rep", "rep"};

cudf::test::strings_column_wrapper input_w{input.begin(), input.end(), input_v.begin()};
cudf::test::strings_column_wrapper expected_w{expected.begin(), expected.end()};

std::unique_ptr<cudf::column> result;
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, *repl, mr()));
ASSERT_NO_THROW(result = cudf::replace_nulls(input_w, *repl));

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, expected_w);
}
Expand Down
Loading

0 comments on commit 2b7294b

Please sign in to comment.