diff --git a/cpp/include/cudf/rolling.hpp b/cpp/include/cudf/rolling.hpp index 2cd34f48265..d55322dd3e8 100644 --- a/cpp/include/cudf/rolling.hpp +++ b/cpp/include/cudf/rolling.hpp @@ -57,6 +57,7 @@ namespace cudf { * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] agg The rolling window aggregation type (SUM, MAX, MIN, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -67,6 +68,7 @@ std::unique_ptr rolling_window( size_type following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -77,6 +79,7 @@ std::unique_ptr rolling_window( * size_type following_window, * size_type min_periods, * rolling_aggregation const& agg, + * rmm::cuda_stream_view stream, * rmm::device_async_resource_ref mr) * * @param default_outputs A column of per-row default values to be returned instead @@ -90,6 +93,7 @@ std::unique_ptr rolling_window( size_type following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -227,6 +231,7 @@ struct window_bounds { * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -238,6 +243,7 @@ std::unique_ptr grouped_rolling_window( size_type following_window, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -249,6 +255,7 @@ std::unique_ptr grouped_rolling_window( * size_type following_window, * size_type min_periods, * rolling_aggregation const& aggr, + * rmm::cuda_stream_view stream, * rmm::device_async_resource_ref mr) */ std::unique_ptr grouped_rolling_window( @@ -258,6 +265,7 @@ std::unique_ptr grouped_rolling_window( window_bounds following_window, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -269,6 +277,7 @@ std::unique_ptr grouped_rolling_window( * size_type following_window, * size_type min_periods, * rolling_aggregation const& aggr, + * rmm::cuda_stream_view stream,, * rmm::device_async_resource_ref mr) * * @param default_outputs A column of per-row default values to be returned instead @@ -283,6 +292,7 @@ std::unique_ptr grouped_rolling_window( size_type following_window, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -295,6 +305,7 @@ std::unique_ptr grouped_rolling_window( * size_type following_window, * size_type min_periods, * rolling_aggregation const& aggr, + * rmm::cuda_stream_view stream, * rmm::device_async_resource_ref mr) */ std::unique_ptr grouped_rolling_window( @@ -305,6 +316,7 @@ std::unique_ptr grouped_rolling_window( window_bounds following_window, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -387,6 +399,7 @@ std::unique_ptr grouped_rolling_window( * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -400,6 +413,7 @@ std::unique_ptr grouped_time_range_rolling_window( size_type following_window_in_days, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -415,6 +429,7 @@ std::unique_ptr grouped_time_range_rolling_window( * size_type following_window_in_days, * size_type min_periods, * rolling_aggregation const& aggr, + * rmm::cuda_stream_view stream, * rmm::device_async_resource_ref mr) * * The `preceding_window_in_days` and `following_window_in_days` are specified as a `window_bounds` @@ -429,6 +444,7 @@ std::unique_ptr grouped_time_range_rolling_window( window_bounds following_window_in_days, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -536,6 +552,7 @@ std::unique_ptr grouped_time_range_rolling_window( * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] aggr The rolling window aggregation type (SUM, MAX, MIN, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -549,6 +566,7 @@ std::unique_ptr grouped_range_rolling_window( range_window_bounds const& following, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -582,6 +600,7 @@ std::unique_ptr grouped_range_rolling_window( * @param[in] min_periods Minimum number of observations in window required to have a value, * otherwise element `i` is null. * @param[in] agg The rolling window aggregation type (sum, max, min, etc.) + * @param[in] stream CUDA stream used for device memory operations and kernel launches * @param[in] mr Device memory resource used to allocate the returned column's device memory * * @returns A nullable output column containing the rolling window results @@ -592,6 +611,7 @@ std::unique_ptr rolling_window( column_view const& following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @} */ // end of group diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index d461ed7a109..9c1cb48e97e 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -40,59 +40,6 @@ #include namespace cudf { -std::unique_ptr grouped_rolling_window(table_view const& group_keys, - column_view const& input, - size_type preceding_window, - size_type following_window, - size_type min_periods, - rolling_aggregation const& aggr, - rmm::device_async_resource_ref mr) -{ - return grouped_rolling_window(group_keys, - input, - window_bounds::get(preceding_window), - window_bounds::get(following_window), - min_periods, - aggr, - mr); -} - -std::unique_ptr grouped_rolling_window(table_view const& group_keys, - column_view const& input, - window_bounds preceding_window, - window_bounds following_window, - size_type min_periods, - rolling_aggregation const& aggr, - rmm::device_async_resource_ref mr) -{ - return grouped_rolling_window(group_keys, - input, - empty_like(input)->view(), - preceding_window, - following_window, - min_periods, - aggr, - mr); -} - -std::unique_ptr grouped_rolling_window(table_view const& group_keys, - column_view const& input, - column_view const& default_outputs, - size_type preceding_window, - size_type following_window, - size_type min_periods, - rolling_aggregation const& aggr, - rmm::device_async_resource_ref mr) -{ - return grouped_rolling_window(group_keys, - input, - default_outputs, - window_bounds::get(preceding_window), - window_bounds::get(following_window), - min_periods, - aggr, - mr); -} namespace detail { @@ -237,8 +184,8 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, if (group_keys.num_columns() == 0) { // No Groupby columns specified. Treat as one big group. - return rolling_window( - input, default_outputs, preceding_window, following_window, min_periods, aggr, mr); + return detail::rolling_window( + input, default_outputs, preceding_window, following_window, min_periods, aggr, stream, mr); } using sort_groupby_helper = cudf::groupby::detail::sort::sort_groupby_helper; @@ -306,6 +253,7 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, window_bounds following_window_bounds, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { return detail::grouped_rolling_window(group_keys, @@ -315,7 +263,67 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, following_window_bounds, min_periods, aggr, - cudf::get_default_stream(), + stream, + mr); +} + +std::unique_ptr grouped_rolling_window(table_view const& group_keys, + column_view const& input, + size_type preceding_window, + size_type following_window, + size_type min_periods, + rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return grouped_rolling_window(group_keys, + input, + window_bounds::get(preceding_window), + window_bounds::get(following_window), + min_periods, + aggr, + stream, + mr); +} + +std::unique_ptr grouped_rolling_window(table_view const& group_keys, + column_view const& input, + window_bounds preceding_window, + window_bounds following_window, + size_type min_periods, + rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return detail::grouped_rolling_window(group_keys, + input, + empty_like(input)->view(), + preceding_window, + following_window, + min_periods, + aggr, + stream, + mr); +} + +std::unique_ptr grouped_rolling_window(table_view const& group_keys, + column_view const& input, + column_view const& default_outputs, + size_type preceding_window, + size_type following_window, + size_type min_periods, + rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return detail::grouped_rolling_window(group_keys, + input, + default_outputs, + window_bounds::get(preceding_window), + window_bounds::get(following_window), + min_periods, + aggr, + stream, mr); } @@ -1199,6 +1207,7 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou size_type following_window_in_days, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -1213,7 +1222,7 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou following, min_periods, aggr, - cudf::get_default_stream(), + stream, mr); } @@ -1237,6 +1246,7 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou window_bounds following_window_in_days, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -1253,7 +1263,7 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou following, min_periods, aggr, - cudf::get_default_stream(), + stream, mr); } @@ -1277,6 +1287,7 @@ std::unique_ptr grouped_range_rolling_window(table_view const& group_key range_window_bounds const& following, size_type min_periods, rolling_aggregation const& aggr, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -1288,7 +1299,7 @@ std::unique_ptr grouped_range_rolling_window(table_view const& group_key following, min_periods, aggr, - cudf::get_default_stream(), + stream, mr); } diff --git a/cpp/src/rolling/rolling.cu b/cpp/src/rolling/rolling.cu index a308ed8a7a6..e612bd01118 100644 --- a/cpp/src/rolling/rolling.cu +++ b/cpp/src/rolling/rolling.cu @@ -32,17 +32,12 @@ std::unique_ptr rolling_window(column_view const& input, size_type following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::rolling_window(input, - default_outputs, - preceding_window, - following_window, - min_periods, - agg, - cudf::get_default_stream(), - mr); + return detail::rolling_window( + input, default_outputs, preceding_window, following_window, min_periods, agg, stream, mr); } // Applies a fixed-size rolling window function to the values in a column, without default specified @@ -51,6 +46,7 @@ std::unique_ptr rolling_window(column_view const& input, size_type following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); @@ -62,7 +58,7 @@ std::unique_ptr rolling_window(column_view const& input, following_window, min_periods, agg, - cudf::get_default_stream(), + stream, mr); } @@ -72,11 +68,12 @@ std::unique_ptr rolling_window(column_view const& input, column_view const& following_window, size_type min_periods, rolling_aggregation const& agg, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); return detail::rolling_window( - input, preceding_window, following_window, min_periods, agg, cudf::get_default_stream(), mr); + input, preceding_window, following_window, min_periods, agg, stream, mr); } } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 7db9a06e809..d20a6374473 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -697,6 +697,7 @@ ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_REDUCTION_TEST streams/reduction_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_ROLLING_TEST streams/rolling_test.cpp STREAM_MODE testing) ConfigureTest( STREAM_STRINGS_TEST streams/strings/case_test.cpp diff --git a/cpp/tests/streams/rolling_test.cpp b/cpp/tests/streams/rolling_test.cpp new file mode 100644 index 00000000000..45057bb359a --- /dev/null +++ b/cpp/tests/streams/rolling_test.cpp @@ -0,0 +1,244 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include + +class RollingTest : public cudf::test::BaseFixture {}; + +TEST_F(RollingTest, FixedSize) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::rolling_window(input, + 2, + 3, + 1, + *cudf::make_min_aggregation(), + cudf::test::get_default_stream()); +} + +TEST_F(RollingTest, FixedSizeDefault) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + cudf::test::fixed_width_column_wrapper defaults({42, 42, 42, 42, 9, 9, 7, 1, 1}); + + cudf::rolling_window(input, + defaults, + 2, + 3, + 1, + *cudf::make_lead_aggregation(1), + cudf::test::get_default_stream()); +} + +TEST_F(RollingTest, VariableSize) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + cudf::test::fixed_width_column_wrapper preceding({2, 2, 2, 2, 3, 3, 3, 3, 3}); + cudf::test::fixed_width_column_wrapper following({3, 3, 3, 3, 3, 2, 2, 2, 2}); + + cudf::rolling_window(input, + preceding, + following, + 1, + *cudf::make_min_aggregation(), + cudf::test::get_default_stream()); +} + +class GroupedRollingTest : public cudf::test::BaseFixture {}; + +TEST_F(GroupedRollingTest, FixedSize) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::test::fixed_width_column_wrapper key_0({1, 1, 1, 2, 2, 2, 3, 3, 3}); + + cudf::test::fixed_width_column_wrapper key_1({4, 4, 4, 5, 5, 5, 6, 6, 6}); + + cudf::table_view grouping_keys{std::vector{key_0, key_1}}; + + cudf::grouped_rolling_window(grouping_keys, + input, + 2, + 3, + 1, + *cudf::make_min_aggregation(), + cudf::test::get_default_stream()); +} + +TEST_F(GroupedRollingTest, FixedSizeDefault) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::test::fixed_width_column_wrapper key_0({1, 1, 1, 2, 2, 2, 3, 3, 3}); + + cudf::test::fixed_width_column_wrapper key_1({4, 4, 4, 5, 5, 5, 6, 6, 6}); + + cudf::test::fixed_width_column_wrapper defaults({42, 42, 42, 42, 9, 9, 7, 1, 1}); + + cudf::table_view grouping_keys{std::vector{key_0, key_1}}; + + cudf::grouped_rolling_window(grouping_keys, + input, + defaults, + 2, + 3, + 1, + *cudf::make_lead_aggregation(1), + cudf::test::get_default_stream()); +} + +TEST_F(GroupedRollingTest, WindowBounds) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::test::fixed_width_column_wrapper key_0({1, 1, 1, 2, 2, 2, 3, 3, 3}); + + cudf::test::fixed_width_column_wrapper key_1({4, 4, 4, 5, 5, 5, 6, 6, 6}); + + auto const unbounded_preceding = cudf::window_bounds::unbounded(); + auto const following = cudf::window_bounds::get(1L); + + cudf::table_view grouping_keys{std::vector{key_0, key_1}}; + + cudf::grouped_rolling_window(grouping_keys, + input, + unbounded_preceding, + following, + 1, + *cudf::make_min_aggregation(), + cudf::test::get_default_stream()); +} + +TEST_F(GroupedRollingTest, WindowBoundsDefault) +{ + cudf::test::fixed_width_column_wrapper input({1, 2, 3, 4, 5, 6, 7, 8, 9}); + + cudf::test::fixed_width_column_wrapper key_0({1, 1, 1, 2, 2, 2, 3, 3, 3}); + + cudf::test::fixed_width_column_wrapper key_1({4, 4, 4, 5, 5, 5, 6, 6, 6}); + + cudf::test::fixed_width_column_wrapper defaults({42, 42, 42, 42, 9, 9, 7, 1, 1}); + + auto const unbounded_preceding = cudf::window_bounds::unbounded(); + auto const following = cudf::window_bounds::get(1L); + + cudf::table_view grouping_keys{std::vector{key_0, key_1}}; + + cudf::grouped_rolling_window(grouping_keys, + input, + defaults, + unbounded_preceding, + following, + 1, + *cudf::make_lead_aggregation(1), + cudf::test::get_default_stream()); +} + +class GroupedTimeRollingTest : public cudf::test::BaseFixture {}; + +TEST_F(GroupedTimeRollingTest, FixedSize) +{ + auto const grp_col = + cudf::test::fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + auto const agg_col = cudf::test::fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {1, 1, 1, 1, 1, 0, 1, 1, 1, 1}}; + auto const time_col = + cudf::test::fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {0, 0, 0, 0, 1, 1, 1, 1, 1, 1}}; + + auto const grouping_keys = cudf::table_view{std::vector{grp_col}}; + auto const preceding = 1L; + auto const following = 1L; + auto const min_periods = 1L; + cudf::grouped_time_range_rolling_window( + grouping_keys, + time_col, + cudf::order::ASCENDING, + agg_col, + preceding, + following, + min_periods, + *cudf::make_count_aggregation(), + cudf::test::get_default_stream()); +} + +TEST_F(GroupedTimeRollingTest, WindowBounds) +{ + auto const grp_col = + cudf::test::fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + auto const agg_col = cudf::test::fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {1, 1, 1, 1, 1, 0, 1, 1, 1, 1}}; + auto const time_col = + cudf::test::fixed_width_column_wrapper{ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, {0, 0, 0, 0, 1, 1, 1, 1, 1, 1}}; + + auto const grouping_keys = cudf::table_view{std::vector{grp_col}}; + auto const unbounded_preceding = cudf::window_bounds::unbounded(); + auto const following = cudf::window_bounds::get(1L); + + auto const min_periods = 1L; + cudf::grouped_time_range_rolling_window( + grouping_keys, + time_col, + cudf::order::ASCENDING, + agg_col, + unbounded_preceding, + following, + min_periods, + *cudf::make_count_aggregation(), + cudf::test::get_default_stream()); +} + +class GroupedRangeRollingTest : public cudf::test::BaseFixture {}; + +TEST_F(GroupedRangeRollingTest, RangeWindowBounds) +{ + auto const grp_col = cudf::test::fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + auto const agg_col = cudf::test::fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + {1, 1, 1, 1, 1, 0, 1, 1, 1, 1}}; + + auto const order_by = cudf::test::fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + {0, 0, 0, 0, 1, 1, 1, 1, 1, 1}}; + + cudf::range_window_bounds preceding = cudf::range_window_bounds::get( + cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}); + + cudf::range_window_bounds following = cudf::range_window_bounds::get( + cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}); + + auto const min_periods = cudf::size_type{1}; + + auto const grouping_keys = cudf::table_view{std::vector{grp_col}}; + + cudf::grouped_range_rolling_window(grouping_keys, + order_by, + cudf::order::ASCENDING, + agg_col, + preceding, + following, + min_periods, + *cudf::make_count_aggregation(), + cudf::test::get_default_stream()); +}