diff --git a/cpp/include/cudf/rolling/range_window_bounds.hpp b/cpp/include/cudf/rolling/range_window_bounds.hpp index 81885ade2f0..a9ee12cea27 100644 --- a/cpp/include/cudf/rolling/range_window_bounds.hpp +++ b/cpp/include/cudf/rolling/range_window_bounds.hpp @@ -56,18 +56,22 @@ struct range_window_bounds { * @brief Factory method to construct a bounded window boundary. * * @param boundary Finite window boundary + * @param stream CUDA stream used for device memory operations and kernel launches * @return A bounded window boundary object */ - static range_window_bounds get(scalar const& boundary); + static range_window_bounds get(scalar const& boundary, + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Factory method to construct a window boundary * limited to the value of the current row * * @param type The datatype of the window boundary + * @param stream CUDA stream used for device memory operations and kernel launches * @return A "current row" window boundary object */ - static range_window_bounds current_row(data_type type); + static range_window_bounds current_row(data_type type, + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Whether or not the window is bounded to the current row @@ -81,9 +85,11 @@ struct range_window_bounds { * @brief Factory method to construct an unbounded window boundary. * * @param type The datatype of the window boundary + * @param stream CUDA stream used for device memory operations and kernel launches * @return An unbounded window boundary object */ - static range_window_bounds unbounded(data_type type); + static range_window_bounds unbounded(data_type type, + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Whether or not the window is unbounded @@ -107,7 +113,9 @@ struct range_window_bounds { extent_type _extent{extent_type::UNBOUNDED}; std::shared_ptr _range_scalar{nullptr}; // To enable copy construction/assignment. - range_window_bounds(extent_type extent_, std::unique_ptr range_scalar_); + range_window_bounds(extent_type extent_, + std::unique_ptr range_scalar_, + rmm::cuda_stream_view = cudf::get_default_stream()); }; /** @} */ // end of group diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index 9c1cb48e97e..1158bf22494 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -1055,14 +1055,15 @@ struct dispatch_grouped_range_rolling_window { */ struct to_duration_bounds { template (), void>* = nullptr> - range_window_bounds operator()(size_type num_days) const + range_window_bounds operator()(size_type num_days, rmm::cuda_stream_view stream) const { using DurationT = typename OrderBy::duration; - return range_window_bounds::get(duration_scalar{duration_D{num_days}, true}); + return range_window_bounds::get(duration_scalar{duration_D{num_days}, true, stream}, + stream); } template (), void>* = nullptr> - range_window_bounds operator()(size_type) const + range_window_bounds operator()(size_type, rmm::cuda_stream_view) const { CUDF_FAIL("Expected timestamp orderby column."); } @@ -1093,9 +1094,11 @@ data_type get_duration_type_for(cudf::data_type timestamp_type) * @param timestamp_type Data-type of the orderby column to which the `num_days` is to be adapted. * @return range_window_bounds A `range_window_bounds` to be used with the new API. */ -range_window_bounds to_range_bounds(cudf::size_type num_days, cudf::data_type timestamp_type) +range_window_bounds to_range_bounds(cudf::size_type num_days, + cudf::data_type timestamp_type, + rmm::cuda_stream_view stream) { - return cudf::type_dispatcher(timestamp_type, to_duration_bounds{}, num_days); + return cudf::type_dispatcher(timestamp_type, to_duration_bounds{}, num_days, stream); } /** @@ -1109,11 +1112,13 @@ range_window_bounds to_range_bounds(cudf::size_type num_days, cudf::data_type ti * @return range_window_bounds A `range_window_bounds` to be used with the new API. */ range_window_bounds to_range_bounds(cudf::window_bounds const& days_bounds, - cudf::data_type timestamp_type) + cudf::data_type timestamp_type, + rmm::cuda_stream_view stream) { return days_bounds.is_unbounded() - ? range_window_bounds::unbounded(get_duration_type_for(timestamp_type)) - : cudf::type_dispatcher(timestamp_type, to_duration_bounds{}, days_bounds.value()); + ? range_window_bounds::unbounded(get_duration_type_for(timestamp_type), stream) + : cudf::type_dispatcher( + timestamp_type, to_duration_bounds{}, days_bounds.value(), stream); } } // namespace @@ -1211,8 +1216,8 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - auto preceding = to_range_bounds(preceding_window_in_days, timestamp_column.type()); - auto following = to_range_bounds(following_window_in_days, timestamp_column.type()); + auto preceding = to_range_bounds(preceding_window_in_days, timestamp_column.type(), stream); + auto following = to_range_bounds(following_window_in_days, timestamp_column.type(), stream); return detail::grouped_range_rolling_window(group_keys, timestamp_column, @@ -1251,9 +1256,9 @@ std::unique_ptr grouped_time_range_rolling_window(table_view const& grou { CUDF_FUNC_RANGE(); range_window_bounds preceding = - to_range_bounds(preceding_window_in_days, timestamp_column.type()); + to_range_bounds(preceding_window_in_days, timestamp_column.type(), stream); range_window_bounds following = - to_range_bounds(following_window_in_days, timestamp_column.type()); + to_range_bounds(following_window_in_days, timestamp_column.type(), stream); return detail::grouped_range_rolling_window(group_keys, timestamp_column, diff --git a/cpp/src/rolling/range_window_bounds.cpp b/cpp/src/rolling/range_window_bounds.cpp index 68e80c6e84e..69792136c64 100644 --- a/cpp/src/rolling/range_window_bounds.cpp +++ b/cpp/src/rolling/range_window_bounds.cpp @@ -32,7 +32,8 @@ namespace { */ struct range_scalar_constructor { template ())> - std::unique_ptr operator()(scalar const& range_scalar_) const + std::unique_ptr operator()(scalar const& range_scalar_, + rmm::cuda_stream_view stream) const { CUDF_FAIL( "Unsupported range type. " @@ -40,51 +41,57 @@ struct range_scalar_constructor { } template ())> - std::unique_ptr operator()(scalar const& range_scalar_) const + std::unique_ptr operator()(scalar const& range_scalar_, + rmm::cuda_stream_view stream) const { return std::make_unique>( - static_cast const&>(range_scalar_)); + static_cast const&>(range_scalar_), stream); } template () && not cudf::is_boolean())> - std::unique_ptr operator()(scalar const& range_scalar_) const + std::unique_ptr operator()(scalar const& range_scalar_, + rmm::cuda_stream_view stream) const { - return std::make_unique>( - static_cast const&>(range_scalar_)); + return std::make_unique>(static_cast const&>(range_scalar_), + stream); } template ())> - std::unique_ptr operator()(scalar const& range_scalar_) const + std::unique_ptr operator()(scalar const& range_scalar_, + rmm::cuda_stream_view stream) const { return std::make_unique>( - static_cast const&>(range_scalar_)); + static_cast const&>(range_scalar_), stream); } }; } // namespace -range_window_bounds::range_window_bounds(extent_type extent_, std::unique_ptr range_scalar_) +range_window_bounds::range_window_bounds(extent_type extent_, + std::unique_ptr range_scalar_, + rmm::cuda_stream_view stream) : _extent{extent_}, _range_scalar{std::move(range_scalar_)} { CUDF_EXPECTS(_range_scalar.get(), "Range window scalar cannot be null."); CUDF_EXPECTS(_extent == extent_type::UNBOUNDED || _extent == extent_type::CURRENT_ROW || - _range_scalar->is_valid(), + _range_scalar->is_valid(stream), "Bounded Range window scalar must be valid."); } -range_window_bounds range_window_bounds::unbounded(data_type type) +range_window_bounds range_window_bounds::unbounded(data_type type, rmm::cuda_stream_view stream) { - return {extent_type::UNBOUNDED, make_default_constructed_scalar(type)}; + return {extent_type::UNBOUNDED, make_default_constructed_scalar(type, stream), stream}; } -range_window_bounds range_window_bounds::current_row(data_type type) +range_window_bounds range_window_bounds::current_row(data_type type, rmm::cuda_stream_view stream) { - return {extent_type::CURRENT_ROW, make_default_constructed_scalar(type)}; + return {extent_type::CURRENT_ROW, make_default_constructed_scalar(type, stream), stream}; } -range_window_bounds range_window_bounds::get(scalar const& boundary) +range_window_bounds range_window_bounds::get(scalar const& boundary, rmm::cuda_stream_view stream) { return {extent_type::BOUNDED, - cudf::type_dispatcher(boundary.type(), range_scalar_constructor{}, boundary)}; + cudf::type_dispatcher(boundary.type(), range_scalar_constructor{}, boundary, stream), + stream}; } } // namespace cudf diff --git a/cpp/tests/streams/rolling_test.cpp b/cpp/tests/streams/rolling_test.cpp index 45057bb359a..b352ad2c0d2 100644 --- a/cpp/tests/streams/rolling_test.cpp +++ b/cpp/tests/streams/rolling_test.cpp @@ -223,10 +223,12 @@ TEST_F(GroupedRangeRollingTest, RangeWindowBounds) {0, 0, 0, 0, 1, 1, 1, 1, 1, 1}}; cudf::range_window_bounds preceding = cudf::range_window_bounds::get( - cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}); + cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}, + cudf::test::get_default_stream()); cudf::range_window_bounds following = cudf::range_window_bounds::get( - cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}); + cudf::numeric_scalar{int{1}, true, cudf::test::get_default_stream()}, + cudf::test::get_default_stream()); auto const min_periods = cudf::size_type{1};