diff --git a/cpp/include/cudf/reduction.hpp b/cpp/include/cudf/reduction.hpp index 5adf89d1706..52f39925a2d 100644 --- a/cpp/include/cudf/reduction.hpp +++ b/cpp/include/cudf/reduction.hpp @@ -75,6 +75,7 @@ enum class scan_type : bool { INCLUSIVE, EXCLUSIVE }; * @param col Input column view * @param agg Aggregation operator applied by the reduction * @param output_dtype The output scalar type + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned scalar's device memory * @returns Output scalar with reduce result */ @@ -82,6 +83,7 @@ std::unique_ptr reduce( column_view const& col, reduce_aggregation const& agg, data_type output_dtype, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -96,6 +98,7 @@ std::unique_ptr reduce( * @param agg Aggregation operator applied by the reduction * @param output_dtype The output scalar type * @param init The initial value of the reduction + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned scalar's device memory * @returns Output scalar with reduce result */ @@ -104,6 +107,7 @@ std::unique_ptr reduce( reduce_aggregation const& agg, data_type output_dtype, std::optional> init, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -145,6 +149,7 @@ std::unique_ptr reduce( * @param null_handling If `INCLUDE`, the reduction is valid if all elements in a segment are valid, * otherwise null. If `EXCLUDE`, the reduction is valid if any element in the segment is valid, * otherwise null. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned scalar's device memory * @returns Output column with results of segmented reduction */ @@ -154,6 +159,7 @@ std::unique_ptr segmented_reduce( segmented_reduce_aggregation const& agg, data_type output_dtype, null_policy null_handling, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -169,6 +175,7 @@ std::unique_ptr segmented_reduce( * otherwise null. If `EXCLUDE`, the reduction is valid if any element in the segment is valid, * otherwise null. * @param init The initial value of the reduction + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned scalar's device memory * @returns Output column with results of segmented reduction. */ @@ -179,6 +186,7 @@ std::unique_ptr segmented_reduce( data_type output_dtype, null_policy null_handling, std::optional> init, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -195,6 +203,7 @@ std::unique_ptr segmented_reduce( * exclusive scan if scan_type::EXCLUSIVE. * @param[in] null_handling Exclude null values when computing the result if null_policy::EXCLUDE. * Include nulls if null_policy::INCLUDE. Any operation with a null results in a null. + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned scalar's device memory * @returns Scanned output column */ @@ -203,6 +212,7 @@ std::unique_ptr scan( scan_aggregation const& agg, scan_type inclusive, null_policy null_handling = null_policy::EXCLUDE, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -210,12 +220,14 @@ std::unique_ptr scan( * * * @param col column to compute minmax + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned column's device memory * @return A std::pair of scalars with the first scalar being the minimum value and the second * scalar being the maximum value of the input column. */ std::pair, std::unique_ptr> minmax( column_view const& col, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @} */ // end of group diff --git a/cpp/src/reductions/minmax.cu b/cpp/src/reductions/minmax.cu index 62a1f4aab7c..2c1181972c5 100644 --- a/cpp/src/reductions/minmax.cu +++ b/cpp/src/reductions/minmax.cu @@ -275,10 +275,10 @@ std::pair, std::unique_ptr> minmax( } // namespace detail std::pair, std::unique_ptr> minmax( - column_view const& col, rmm::device_async_resource_ref mr) + column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::minmax(col, cudf::get_default_stream(), mr); + return detail::minmax(col, stream, mr); } } // namespace cudf diff --git a/cpp/src/reductions/reductions.cpp b/cpp/src/reductions/reductions.cpp index cde0274339a..8fa036a0949 100644 --- a/cpp/src/reductions/reductions.cpp +++ b/cpp/src/reductions/reductions.cpp @@ -208,20 +208,21 @@ std::unique_ptr reduce(column_view const& col, std::unique_ptr reduce(column_view const& col, reduce_aggregation const& agg, data_type output_dtype, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return reduction::detail::reduce( - col, agg, output_dtype, std::nullopt, cudf::get_default_stream(), mr); + return reduction::detail::reduce(col, agg, output_dtype, std::nullopt, stream, mr); } std::unique_ptr reduce(column_view const& col, reduce_aggregation const& agg, data_type output_dtype, std::optional> init, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return reduction::detail::reduce(col, agg, output_dtype, init, cudf::get_default_stream(), mr); + return reduction::detail::reduce(col, agg, output_dtype, init, stream, mr); } } // namespace cudf diff --git a/cpp/src/reductions/scan/scan.cpp b/cpp/src/reductions/scan/scan.cpp index b6e8690a6c9..de4dcf1de52 100644 --- a/cpp/src/reductions/scan/scan.cpp +++ b/cpp/src/reductions/scan/scan.cpp @@ -60,10 +60,11 @@ std::unique_ptr scan(column_view const& input, scan_aggregation const& agg, scan_type inclusive, null_policy null_handling, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::scan(input, agg, inclusive, null_handling, cudf::get_default_stream(), mr); + return detail::scan(input, agg, inclusive, null_handling, stream, mr); } } // namespace cudf diff --git a/cpp/src/reductions/segmented/reductions.cpp b/cpp/src/reductions/segmented/reductions.cpp index 1ae344dcace..48ab5963a29 100644 --- a/cpp/src/reductions/segmented/reductions.cpp +++ b/cpp/src/reductions/segmented/reductions.cpp @@ -138,17 +138,12 @@ std::unique_ptr segmented_reduce(column_view const& segmented_values, segmented_reduce_aggregation const& agg, data_type output_dtype, null_policy null_handling, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return reduction::detail::segmented_reduce(segmented_values, - offsets, - agg, - output_dtype, - null_handling, - std::nullopt, - cudf::get_default_stream(), - mr); + return reduction::detail::segmented_reduce( + segmented_values, offsets, agg, output_dtype, null_handling, std::nullopt, stream, mr); } std::unique_ptr segmented_reduce(column_view const& segmented_values, @@ -157,17 +152,12 @@ std::unique_ptr segmented_reduce(column_view const& segmented_values, data_type output_dtype, null_policy null_handling, std::optional> init, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return reduction::detail::segmented_reduce(segmented_values, - offsets, - agg, - output_dtype, - null_handling, - init, - cudf::get_default_stream(), - mr); + return reduction::detail::segmented_reduce( + segmented_values, offsets, agg, output_dtype, null_handling, init, stream, mr); } } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index e779e1d1410..c2982c478cd 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -695,6 +695,7 @@ ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing) ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_REDUCTION_TEST streams/reduction_test.cpp STREAM_MODE testing) ConfigureTest( STREAM_STRINGS_TEST streams/strings/case_test.cpp diff --git a/cpp/tests/streams/reduction_test.cpp b/cpp/tests/streams/reduction_test.cpp new file mode 100644 index 00000000000..53dd1eed459 --- /dev/null +++ b/cpp/tests/streams/reduction_test.cpp @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include + +class ReductionTest : public cudf::test::BaseFixture {}; + +TEST_F(ReductionTest, ReductionSum) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + cudf::reduce(input, + *cudf::make_sum_aggregation(), + cudf::data_type(cudf::type_id::INT32), + cudf::test::get_default_stream()); +} + +TEST_F(ReductionTest, ReductionSumScalarInit) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); + auto const init_scalar = cudf::make_fixed_width_scalar(3, cudf::test::get_default_stream()); + cudf::reduce(input, + *cudf::make_sum_aggregation(), + cudf::data_type(cudf::type_id::INT32), + *init_scalar, + cudf::test::get_default_stream()); +} + +TEST_F(ReductionTest, SegmentedReductionSum) +{ + auto const input = cudf::test::fixed_width_column_wrapper{{1, 2, 3, 1, 0, 3, 1, 0, 0, 0}, + {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}}; + auto const offsets = std::vector{0, 3, 6, 7, 8, 10, 10}; + auto const d_offsets = cudf::detail::make_device_uvector_async( + offsets, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); + + auto res = + cudf::segmented_reduce(input, + d_offsets, + *cudf::make_sum_aggregation(), + cudf::data_type(cudf::type_id::INT32), + cudf::null_policy::EXCLUDE, + cudf::test::get_default_stream()); +} + +TEST_F(ReductionTest, SegmentedReductionSumScalarInit) +{ + auto const input = cudf::test::fixed_width_column_wrapper{{1, 2, 3, 1, 0, 3, 1, 0, 0, 0}, + {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}}; + auto const offsets = std::vector{0, 3, 6, 7, 8, 10, 10}; + auto const d_offsets = cudf::detail::make_device_uvector_async( + offsets, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); + auto const init_scalar = cudf::make_fixed_width_scalar(3, cudf::test::get_default_stream()); + auto res = + cudf::segmented_reduce(input, + d_offsets, + *cudf::make_sum_aggregation(), + cudf::data_type(cudf::type_id::INT32), + cudf::null_policy::EXCLUDE, + *init_scalar, + cudf::test::get_default_stream()); +} + +TEST_F(ReductionTest, ScanMin) +{ + auto const input = cudf::test::fixed_width_column_wrapper{ + {123, 64, 63, 99, -5, 123, -16, -120, -111}, {1, 0, 1, 1, 1, 1, 0, 0, 1}}; + + cudf::scan(input, + *cudf::make_min_aggregation(), + cudf::scan_type::INCLUSIVE, + cudf::null_policy::EXCLUDE, + cudf::test::get_default_stream()); +} + +TEST_F(ReductionTest, MinMax) +{ + auto const input = cudf::test::fixed_width_column_wrapper{ + {123, 64, 63, 99, -5, 123, -16, -120, -111}, {1, 0, 1, 1, 1, 1, 0, 0, 1}}; + + cudf::minmax(input, cudf::test::get_default_stream()); +}