From af40eb71bdd3d73933f75710bdf9018bd015fe5f Mon Sep 17 00:00:00 2001 From: srinivasyadav18 Date: Sun, 26 May 2024 20:41:18 +0000 Subject: [PATCH 1/3] Expose stream parameter to public rolling APIs --- cpp/include/cudf/rolling.hpp | 20 +++ cpp/src/rolling/grouped_rolling.cu | 129 ++++++++------- cpp/src/rolling/rolling.cu | 17 +- cpp/tests/CMakeLists.txt | 1 + cpp/tests/streams/rolling_test.cpp | 244 +++++++++++++++++++++++++++++ 5 files changed, 342 insertions(+), 69 deletions(-) create mode 100644 cpp/tests/streams/rolling_test.cpp diff --git a/cpp/include/cudf/rolling.hpp b/cpp/include/cudf/rolling.hpp index 2cd34f48265..d55322dd3e8 100644 --- a/cpp/include/cudf/rolling.hpp +++ b/cpp/include/cudf/rolling.hpp @@ -57,6 +57,7 @@ namespace cudf { * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] agg The rolling window aggregation type (SUM, MAX, MIN, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -67,6 +68,7 @@ std::unique_ptr rolling_window( size_type following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -77,6 +79,7 @@ std::unique_ptr rolling_window( * size_type following_window, * size_type min_periods, * rolling_aggregation const& agg, + * rmm::cuda_stream_view stream, * rmm::device_async_resource_ref mr) * * @param default_outputs A column of per-row default values to be returned instead @@ -90,6 +93,7 @@ std::unique_ptr rolling_window( size_type following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -227,6 +231,7 @@ struct window_bounds { * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -238,6 +243,7 @@ std::unique_ptr grouped_rolling_window( size_type following_window, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -249,6 +255,7 @@ std::unique_ptr grouped_rolling_window( * size_type following_window, * size_type min_periods, * rolling_aggregation const& aggr, + * rmm::cuda_stream_view stream, * rmm::device_async_resource_ref mr) */ std::unique_ptr grouped_rolling_window( @@ -258,6 +265,7 @@ std::unique_ptr grouped_rolling_window( window_bounds following_window, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -269,6 +277,7 @@ std::unique_ptr grouped_rolling_window( * size_type following_window, * size_type min_periods, * rolling_aggregation const& aggr, + * rmm::cuda_stream_view stream,, * rmm::device_async_resource_ref mr) * * @param default_outputs A column of per-row default values to be returned instead @@ -283,6 +292,7 @@ std::unique_ptr grouped_rolling_window( size_type following_window, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -295,6 +305,7 @@ std::unique_ptr grouped_rolling_window( * size_type following_window, * size_type min_periods, * rolling_aggregation const& aggr, + * rmm::cuda_stream_view stream, * rmm::device_async_resource_ref mr) */ std::unique_ptr grouped_rolling_window( @@ -305,6 +316,7 @@ std::unique_ptr grouped_rolling_window( window_bounds following_window, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -387,6 +399,7 @@ std::unique_ptr grouped_rolling_window( * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -400,6 +413,7 @@ std::unique_ptr grouped_time_range_rolling_window( size_type following_window_in_days, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -415,6 +429,7 @@ std::unique_ptr grouped_time_range_rolling_window( * size_type following_window_in_days, * size_type min_periods, * rolling_aggregation const& aggr, + * rmm::cuda_stream_view stream, * rmm::device_async_resource_ref mr) * * The `preceding_window_in_days` and `following_window_in_days` are specified as a `window_bounds` @@ -429,6 +444,7 @@ std::unique_ptr grouped_time_range_rolling_window( window_bounds following_window_in_days, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -536,6 +552,7 @@ std::unique_ptr grouped_time_range_rolling_window( * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -549,6 +566,7 @@ std::unique_ptr grouped_range_rolling_window( range_window_bounds const& following, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -582,6 +600,7 @@ std::unique_ptr grouped_range_rolling_window( * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] agg The rolling window aggregation type (sum, max, min, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -592,6 +611,7 @@ std::unique_ptr rolling_window( column_view const& following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @} */ // end of group diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index d461ed7a109..9c1cb48e97e 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -40,59 +40,6 @@ #include namespace cudf { -std::unique_ptr grouped_rolling_window(table_view const& group_keys, - column_view const& input, - size_type preceding_window, - size_type following_window, - size_type min_periods, - rolling_aggregation const& aggr, - rmm::device_async_resource_ref mr) -{ - return grouped_rolling_window(group_keys, - input, - window_bounds::get(preceding_window), - window_bounds::get(following_window), - min_periods, - aggr, - mr); -} - -std::unique_ptr grouped_rolling_window(table_view const& group_keys, - column_view const& input, - window_bounds preceding_window, - window_bounds following_window, - size_type min_periods, - rolling_aggregation const& aggr, - rmm::device_async_resource_ref mr) -{ - return grouped_rolling_window(group_keys, - input, - empty_like(input)->view(), - preceding_window, - following_window, - min_periods, - aggr, - mr); -} - -std::unique_ptr grouped_rolling_window(table_view const& group_keys, - column_view const& input, - column_view const& default_outputs, - size_type preceding_window, - size_type following_window, - size_type min_periods, - rolling_aggregation const& aggr, - rmm::device_async_resource_ref mr) -{ - return grouped_rolling_window(group_keys, - input, - default_outputs, - window_bounds::get(preceding_window), - window_bounds::get(following_window), - min_periods, - aggr, - mr); -} namespace detail { @@ -237,8 +184,8 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, if (group_keys.num_columns() == 0) { // No Groupby columns specified. Treat as one big group. - return rolling_window( - input, default_outputs, preceding_window, following_window, min_periods, aggr, mr); + return detail::rolling_window( + input, default_outputs, preceding_window, following_window, min_periods, aggr, stream, mr); } using sort_groupby_helper = cudf::groupby::detail::sort::sort_groupby_helper; @@ -306,6 +253,7 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, window_bounds following_window_bounds, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { return detail::grouped_rolling_window(group_keys, @@ -315,7 +263,67 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, following_window_bounds, min_periods, aggr, - cudf::get_default_stream(), + stream, + mr); +} + +std::unique_ptr grouped_rolling_window(table_view const& group_keys, + column_view const& input, + size_type preceding_window, + size_type following_window, + size_type min_periods, + rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return grouped_rolling_window(group_keys, + input, + window_bounds::get(preceding_window), + window_bounds::get(following_window), + min_periods, + aggr, + stream, + mr); +} + +std::unique_ptr grouped_rolling_window(table_view const& group_keys, + column_view const& input, + window_bounds preceding_window, + window_bounds following_window, + size_type min_periods, + rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return detail::grouped_rolling_window(group_keys, + input, + empty_like(input)->view(), + preceding_window, + following_window, + min_periods, + aggr, + stream, + mr); +} + +std::unique_ptr grouped_rolling_window(table_view const& group_keys, + column_view const& input, + column_view const& default_outputs, + size_type preceding_window, + size_type following_window, + size_type min_periods, + rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return detail::grouped_rolling_window(group_keys, + input, + default_outputs, + window_bounds::get(preceding_window), + window_bounds::get(following_window), + min_periods, + aggr, + stream, mr); } @@ -1199,6 +1207,7 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou size_type following_window_in_days, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -1213,7 +1222,7 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou following, min_periods, aggr, - cudf::get_default_stream(), + stream, mr); } @@ -1237,6 +1246,7 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou window_bounds following_window_in_days, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -1253,7 +1263,7 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou following, min_periods, aggr, - cudf::get_default_stream(), + stream, mr); } @@ -1277,6 +1287,7 @@ std::unique_ptr grouped_range_rolling_window(table_view const& group_key range_window_bounds const& following, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -1288,7 +1299,7 @@ std::unique_ptr grouped_range_rolling_window(table_view const& group_key following, min_periods, aggr, - cudf::get_default_stream(), + stream, mr); } diff --git a/cpp/src/rolling/rolling.cu b/cpp/src/rolling/rolling.cu index a308ed8a7a6..e612bd01118 100644 --- a/cpp/src/rolling/rolling.cu +++ b/cpp/src/rolling/rolling.cu @@ -32,17 +32,12 @@ std::unique_ptr rolling_window(column_view const& input, size_type following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::rolling_window(input, - default_outputs, - preceding_window, - following_window, - min_periods, - agg, - cudf::get_default_stream(), - mr); + return detail::rolling_window( + input, default_outputs, preceding_window, following_window, min_periods, agg, stream, mr); } // Applies a fixed-size rolling window function to the values in a column, without default specified @@ -51,6 +46,7 @@ std::unique_ptr rolling_window(column_view const& input, size_type following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -62,7 +58,7 @@ std::unique_ptr rolling_window(column_view const& input, following_window, min_periods, agg, - cudf::get_default_stream(), + stream, mr); } @@ -72,11 +68,12 @@ std::unique_ptr rolling_window(column_view const& input, column_view const& following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); return detail::rolling_window( - input, preceding_window, following_window, min_periods, agg, cudf::get_default_stream(), mr); + input, preceding_window, following_window, min_periods, agg, stream, mr); } } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 826f879ddc0..f25339595f0 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -696,6 +696,7 @@ ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_REDUCTION_TEST streams/reduction_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_ROLLING_TEST streams/rolling_test.cpp STREAM_MODE testing) ConfigureTest( STREAM_STRINGS_TEST streams/strings/case_test.cpp diff --git a/cpp/tests/streams/rolling_test.cpp b/cpp/tests/streams/rolling_test.cpp new file mode 100644 index 00000000000..45057bb359a --- /dev/null +++ b/cpp/tests/streams/rolling_test.cpp @@ -0,0 +1,244 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include + +class RollingTest : public cudf::test::BaseFixture {}; + +TEST_F(RollingTest, FixedSize) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::rolling_window(input, + 2, + 3, + 1, + *cudf::make_min_aggregation(), + cudf::test::get_default_stream()); +} + +TEST_F(RollingTest, FixedSizeDefault) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + cudf::test::fixed_width_column_wrapper defaults({42, 42, 42, 42, 9, 9, 7, 1, 1}); + + cudf::rolling_window(input, + defaults, + 2, + 3, + 1, + *cudf::make_lead_aggregation(1), + cudf::test::get_default_stream()); +} + +TEST_F(RollingTest, VariableSize) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + cudf::test::fixed_width_column_wrapper preceding({2, 2, 2, 2, 3, 3, 3, 3, 3}); + cudf::test::fixed_width_column_wrapper following({3, 3, 3, 3, 3, 2, 2, 2, 2}); + + cudf::rolling_window(input, + preceding, + following, + 1, + *cudf::make_min_aggregation(), + cudf::test::get_default_stream()); +} + +class GroupedRollingTest : public cudf::test::BaseFixture {}; + +TEST_F(GroupedRollingTest, FixedSize) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::test::fixed_width_column_wrapper key_0({1, 1, 1, 2, 2, 2, 3, 3, 3}); + + cudf::test::fixed_width_column_wrapper key_1({4, 4, 4, 5, 5, 5, 6, 6, 6}); + + cudf::table_view grouping_keys{std::vector{key_0, key_1}}; + + cudf::grouped_rolling_window(grouping_keys, + input, + 2, + 3, + 1, + *cudf::make_min_aggregation(), + cudf::test::get_default_stream()); +} + +TEST_F(GroupedRollingTest, FixedSizeDefault) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::test::fixed_width_column_wrapper key_0({1, 1, 1, 2, 2, 2, 3, 3, 3}); + + cudf::test::fixed_width_column_wrapper key_1({4, 4, 4, 5, 5, 5, 6, 6, 6}); + + cudf::test::fixed_width_column_wrapper defaults({42, 42, 42, 42, 9, 9, 7, 1, 1}); + + cudf::table_view grouping_keys{std::vector{key_0, key_1}}; + + cudf::grouped_rolling_window(grouping_keys, + input, + defaults, + 2, + 3, + 1, + *cudf::make_lead_aggregation(1), + cudf::test::get_default_stream()); +} + +TEST_F(GroupedRollingTest, WindowBounds) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::test::fixed_width_column_wrapper key_0({1, 1, 1, 2, 2, 2, 3, 3, 3}); + + cudf::test::fixed_width_column_wrapper key_1({4, 4, 4, 5, 5, 5, 6, 6, 6}); + + auto const unbounded_preceding = cudf::window_bounds::unbounded(); + auto const following = cudf::window_bounds::get(1L); + + cudf::table_view grouping_keys{std::vector{key_0, key_1}}; + + cudf::grouped_rolling_window(grouping_keys, + input, + unbounded_preceding, + following, + 1, + *cudf::make_min_aggregation(), + cudf::test::get_default_stream()); +} + +TEST_F(GroupedRollingTest, WindowBoundsDefault) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::test::fixed_width_column_wrapper key_0({1, 1, 1, 2, 2, 2, 3, 3, 3}); + + cudf::test::fixed_width_column_wrapper key_1({4, 4, 4, 5, 5, 5, 6, 6, 6}); + + cudf::test::fixed_width_column_wrapper defaults({42, 42, 42, 42, 9, 9, 7, 1, 1}); + + auto const unbounded_preceding = cudf::window_bounds::unbounded(); + auto const following = cudf::window_bounds::get(1L); + + cudf::table_view grouping_keys{std::vector{key_0, key_1}}; + + cudf::grouped_rolling_window(grouping_keys, + input, + defaults, + unbounded_preceding, + following, + 1, + *cudf::make_lead_aggregation(1), + cudf::test::get_default_stream()); +} + +class GroupedTimeRollingTest : public cudf::test::BaseFixture {}; + +TEST_F(GroupedTimeRollingTest, FixedSize) +{ + auto const grp_col = + cudf::test::fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + auto const agg_col = cudf::test::fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {1, 1, 1, 1, 1, 0, 1, 1, 1, 1}}; + auto const time_col = + cudf::test::fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {0, 0, 0, 0, 1, 1, 1, 1, 1, 1}}; + + auto const grouping_keys = cudf::table_view{std::vector{grp_col}}; + auto const preceding = 1L; + auto const following = 1L; + auto const min_periods = 1L; + cudf::grouped_time_range_rolling_window( + grouping_keys, + time_col, + cudf::order::ASCENDING, + agg_col, + preceding, + following, + min_periods, + *cudf::make_count_aggregation(), + cudf::test::get_default_stream()); +} + +TEST_F(GroupedTimeRollingTest, WindowBounds) +{ + auto const grp_col = + cudf::test::fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + auto const agg_col = cudf::test::fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {1, 1, 1, 1, 1, 0, 1, 1, 1, 1}}; + auto const time_col = + cudf::test::fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {0, 0, 0, 0, 1, 1, 1, 1, 1, 1}}; + + auto const grouping_keys = cudf::table_view{std::vector{grp_col}}; + auto const unbounded_preceding = cudf::window_bounds::unbounded(); + auto const following = cudf::window_bounds::get(1L); + + auto const min_periods = 1L; + cudf::grouped_time_range_rolling_window( + grouping_keys, + time_col, + cudf::order::ASCENDING, + agg_col, + unbounded_preceding, + following, + min_periods, + *cudf::make_count_aggregation(), + cudf::test::get_default_stream()); +} + +class GroupedRangeRollingTest : public cudf::test::BaseFixture {}; + +TEST_F(GroupedRangeRollingTest, RangeWindowBounds) +{ + auto const grp_col = cudf::test::fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + auto const agg_col = cudf::test::fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + {1, 1, 1, 1, 1, 0, 1, 1, 1, 1}}; + + auto const order_by = cudf::test::fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + {0, 0, 0, 0, 1, 1, 1, 1, 1, 1}}; + + cudf::range_window_bounds preceding = cudf::range_window_bounds::get( + cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}); + + cudf::range_window_bounds following = cudf::range_window_bounds::get( + cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}); + + auto const min_periods = cudf::size_type{1}; + + auto const grouping_keys = cudf::table_view{std::vector{grp_col}}; + + cudf::grouped_range_rolling_window(grouping_keys, + order_by, + cudf::order::ASCENDING, + agg_col, + preceding, + following, + min_periods, + *cudf::make_count_aggregation(), + cudf::test::get_default_stream()); +} From 88af8b43e78c265a3c20ff03810c47f4d71e6c99 Mon Sep 17 00:00:00 2001 From: srinivasyadav18 Date: Fri, 7 Jun 2024 02:21:22 +0000 Subject: [PATCH 2/3] update range_window_bounds API to accept stream parameter range_window_bounds API functions uses scalars internally, which should also the same stream parameter. Signed-off-by: srinivasyadav18 --- .../cudf/rolling/range_window_bounds.hpp | 16 ++++++-- cpp/src/rolling/grouped_rolling.cu | 29 ++++++++------ cpp/src/rolling/range_window_bounds.cpp | 39 +++++++++++-------- cpp/tests/streams/rolling_test.cpp | 6 ++- 4 files changed, 56 insertions(+), 34 deletions(-) diff --git a/cpp/include/cudf/rolling/range_window_bounds.hpp b/cpp/include/cudf/rolling/range_window_bounds.hpp index 81885ade2f0..a9ee12cea27 100644 --- a/cpp/include/cudf/rolling/range_window_bounds.hpp +++ b/cpp/include/cudf/rolling/range_window_bounds.hpp @@ -56,18 +56,22 @@ struct range_window_bounds { * @brief Factory method to construct a bounded window boundary. * * @param boundary Finite window boundary + * @param stream CUDA stream used for device memory operations and kernel launches * @return A bounded window boundary object */ - static range_window_bounds get(scalar const& boundary); + static range_window_bounds get(scalar const& boundary, + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Factory method to construct a window boundary * limited to the value of the current row * * @param type The datatype of the window boundary + * @param stream CUDA stream used for device memory operations and kernel launches * @return A "current row" window boundary object */ - static range_window_bounds current_row(data_type type); + static range_window_bounds current_row(data_type type, + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Whether or not the window is bounded to the current row @@ -81,9 +85,11 @@ struct range_window_bounds { * @brief Factory method to construct an unbounded window boundary. * * @param type The datatype of the window boundary + * @param stream CUDA stream used for device memory operations and kernel launches * @return An unbounded window boundary object */ - static range_window_bounds unbounded(data_type type); + static range_window_bounds unbounded(data_type type, + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Whether or not the window is unbounded @@ -107,7 +113,9 @@ struct range_window_bounds { extent_type _extent{extent_type::UNBOUNDED}; std::shared_ptr _range_scalar{nullptr}; // To enable copy construction/assignment. - range_window_bounds(extent_type extent_, std::unique_ptr range_scalar_); + range_window_bounds(extent_type extent_, + std::unique_ptr range_scalar_, + rmm::cuda_stream_view = cudf::get_default_stream()); }; /** @} */ // end of group diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index 9c1cb48e97e..1158bf22494 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -1055,14 +1055,15 @@ struct dispatch_grouped_range_rolling_window { */ struct to_duration_bounds { template (), void>* = nullptr> - range_window_bounds operator()(size_type num_days) const + range_window_bounds operator()(size_type num_days, rmm::cuda_stream_view stream) const { using DurationT = typename OrderBy::duration; - return range_window_bounds::get(duration_scalar{duration_D{num_days}, true}); + return range_window_bounds::get(duration_scalar{duration_D{num_days}, true, stream}, + stream); } template (), void>* = nullptr> - range_window_bounds operator()(size_type) const + range_window_bounds operator()(size_type, rmm::cuda_stream_view) const { CUDF_FAIL("Expected timestamp orderby column."); } @@ -1093,9 +1094,11 @@ data_type get_duration_type_for(cudf::data_type timestamp_type) * @param timestamp_type Data-type of the orderby column to which the `num_days` is to be adapted. * @return range_window_bounds A `range_window_bounds` to be used with the new API. */ -range_window_bounds to_range_bounds(cudf::size_type num_days, cudf::data_type timestamp_type) +range_window_bounds to_range_bounds(cudf::size_type num_days, + cudf::data_type timestamp_type, + rmm::cuda_stream_view stream) { - return cudf::type_dispatcher(timestamp_type, to_duration_bounds{}, num_days); + return cudf::type_dispatcher(timestamp_type, to_duration_bounds{}, num_days, stream); } /** @@ -1109,11 +1112,13 @@ range_window_bounds to_range_bounds(cudf::size_type num_days, cudf::data_type ti * @return range_window_bounds A `range_window_bounds` to be used with the new API. */ range_window_bounds to_range_bounds(cudf::window_bounds const& days_bounds, - cudf::data_type timestamp_type) + cudf::data_type timestamp_type, + rmm::cuda_stream_view stream) { return days_bounds.is_unbounded() - ? range_window_bounds::unbounded(get_duration_type_for(timestamp_type)) - : cudf::type_dispatcher(timestamp_type, to_duration_bounds{}, days_bounds.value()); + ? range_window_bounds::unbounded(get_duration_type_for(timestamp_type), stream) + : cudf::type_dispatcher( + timestamp_type, to_duration_bounds{}, days_bounds.value(), stream); } } // namespace @@ -1211,8 +1216,8 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - auto preceding = to_range_bounds(preceding_window_in_days, timestamp_column.type()); - auto following = to_range_bounds(following_window_in_days, timestamp_column.type()); + auto preceding = to_range_bounds(preceding_window_in_days, timestamp_column.type(), stream); + auto following = to_range_bounds(following_window_in_days, timestamp_column.type(), stream); return detail::grouped_range_rolling_window(group_keys, timestamp_column, @@ -1251,9 +1256,9 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou { CUDF_FUNC_RANGE(); range_window_bounds preceding = - to_range_bounds(preceding_window_in_days, timestamp_column.type()); + to_range_bounds(preceding_window_in_days, timestamp_column.type(), stream); range_window_bounds following = - to_range_bounds(following_window_in_days, timestamp_column.type()); + to_range_bounds(following_window_in_days, timestamp_column.type(), stream); return detail::grouped_range_rolling_window(group_keys, timestamp_column, diff --git a/cpp/src/rolling/range_window_bounds.cpp b/cpp/src/rolling/range_window_bounds.cpp index 68e80c6e84e..69792136c64 100644 --- a/cpp/src/rolling/range_window_bounds.cpp +++ b/cpp/src/rolling/range_window_bounds.cpp @@ -32,7 +32,8 @@ namespace { */ struct range_scalar_constructor { template ())> - std::unique_ptr operator()(scalar const& range_scalar_) const + std::unique_ptr operator()(scalar const& range_scalar_, + rmm::cuda_stream_view stream) const { CUDF_FAIL( "Unsupported range type. " @@ -40,51 +41,57 @@ struct range_scalar_constructor { } template ())> - std::unique_ptr operator()(scalar const& range_scalar_) const + std::unique_ptr operator()(scalar const& range_scalar_, + rmm::cuda_stream_view stream) const { return std::make_unique>( - static_cast const&>(range_scalar_)); + static_cast const&>(range_scalar_), stream); } template () && not cudf::is_boolean())> - std::unique_ptr operator()(scalar const& range_scalar_) const + std::unique_ptr operator()(scalar const& range_scalar_, + rmm::cuda_stream_view stream) const { - return std::make_unique>( - static_cast const&>(range_scalar_)); + return std::make_unique>(static_cast const&>(range_scalar_), + stream); } template ())> - std::unique_ptr operator()(scalar const& range_scalar_) const + std::unique_ptr operator()(scalar const& range_scalar_, + rmm::cuda_stream_view stream) const { return std::make_unique>( - static_cast const&>(range_scalar_)); + static_cast const&>(range_scalar_), stream); } }; } // namespace -range_window_bounds::range_window_bounds(extent_type extent_, std::unique_ptr range_scalar_) +range_window_bounds::range_window_bounds(extent_type extent_, + std::unique_ptr range_scalar_, + rmm::cuda_stream_view stream) : _extent{extent_}, _range_scalar{std::move(range_scalar_)} { CUDF_EXPECTS(_range_scalar.get(), "Range window scalar cannot be null."); CUDF_EXPECTS(_extent == extent_type::UNBOUNDED || _extent == extent_type::CURRENT_ROW || - _range_scalar->is_valid(), + _range_scalar->is_valid(stream), "Bounded Range window scalar must be valid."); } -range_window_bounds range_window_bounds::unbounded(data_type type) +range_window_bounds range_window_bounds::unbounded(data_type type, rmm::cuda_stream_view stream) { - return {extent_type::UNBOUNDED, make_default_constructed_scalar(type)}; + return {extent_type::UNBOUNDED, make_default_constructed_scalar(type, stream), stream}; } -range_window_bounds range_window_bounds::current_row(data_type type) +range_window_bounds range_window_bounds::current_row(data_type type, rmm::cuda_stream_view stream) { - return {extent_type::CURRENT_ROW, make_default_constructed_scalar(type)}; + return {extent_type::CURRENT_ROW, make_default_constructed_scalar(type, stream), stream}; } -range_window_bounds range_window_bounds::get(scalar const& boundary) +range_window_bounds range_window_bounds::get(scalar const& boundary, rmm::cuda_stream_view stream) { return {extent_type::BOUNDED, - cudf::type_dispatcher(boundary.type(), range_scalar_constructor{}, boundary)}; + cudf::type_dispatcher(boundary.type(), range_scalar_constructor{}, boundary, stream), + stream}; } } // namespace cudf diff --git a/cpp/tests/streams/rolling_test.cpp b/cpp/tests/streams/rolling_test.cpp index 45057bb359a..b352ad2c0d2 100644 --- a/cpp/tests/streams/rolling_test.cpp +++ b/cpp/tests/streams/rolling_test.cpp @@ -223,10 +223,12 @@ TEST_F(GroupedRangeRollingTest, RangeWindowBounds) {0, 0, 0, 0, 1, 1, 1, 1, 1, 1}}; cudf::range_window_bounds preceding = cudf::range_window_bounds::get( - cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}); + cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}, + cudf::test::get_default_stream()); cudf::range_window_bounds following = cudf::range_window_bounds::get( - cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}); + cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}, + cudf::test::get_default_stream()); auto const min_periods = cudf::size_type{1}; From a13a464492d54245a008c4c2fe4e97565e56d70f Mon Sep 17 00:00:00 2001 From: srinivasyadav18 Date: Fri, 7 Jun 2024 20:36:25 +0000 Subject: [PATCH 3/3] Minor fix: alphabetize tests in cmake --- cpp/tests/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index f25339595f0..2e842ef2352 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -692,11 +692,11 @@ ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testi ConfigureTest(STREAM_ORCIO_TEST streams/io/orc_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_PARQUETIO_TEST streams/io/parquet_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing) +ConfigureTest(STREAM_REDUCTION_TEST streams/reduction_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_ROLLING_TEST streams/rolling_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing) -ConfigureTest(STREAM_REDUCTION_TEST streams/reduction_test.cpp STREAM_MODE testing) -ConfigureTest(STREAM_ROLLING_TEST streams/rolling_test.cpp STREAM_MODE testing) ConfigureTest( STREAM_STRINGS_TEST streams/strings/case_test.cpp