Skip to content

Commit

Permalink
Remove remaining default stream parameters (#12943)
Browse files Browse the repository at this point in the history
This PR closes #9854, removing all default stream parameters in detail APIs. This increases stream-safety by removing the ability to accidentally use the default stream when a detail API is called without an explicit stream parameter when a user-provided stream should have been passed through.

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Yunsong Wang (https://github.com/PointKernel)
  - David Wendt (https://github.com/davidwendt)

URL: #12943
  • Loading branch information
vyasr authored Mar 15, 2023
1 parent 6c8bf45 commit dfa9e93
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 44 deletions.
8 changes: 4 additions & 4 deletions cpp/src/binaryop/compiled/struct_binary_ops.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ void apply_struct_binary_op(mutable_column_view& out,
column_view const& rhs,
bool is_lhs_scalar,
bool is_rhs_scalar,
PhysicalElementComparator comparator = {},
rmm::cuda_stream_view stream = cudf::get_default_stream())
PhysicalElementComparator comparator,
rmm::cuda_stream_view stream)
{
auto const compare_orders = std::vector<order>(
lhs.size(),
Expand Down Expand Up @@ -144,8 +144,8 @@ void apply_struct_equality_op(mutable_column_view& out,
bool is_lhs_scalar,
bool is_rhs_scalar,
binary_operator op,
PhysicalEqualityComparator comparator = {},
rmm::cuda_stream_view stream = cudf::get_default_stream())
PhysicalEqualityComparator comparator,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(op == binary_operator::EQUAL || op == binary_operator::NOT_EQUAL ||
op == binary_operator::NULL_EQUALS,
Expand Down
21 changes: 10 additions & 11 deletions cpp/src/join/conditional_join.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,9 +47,9 @@ conditional_join(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
join_kind JoinKind,
std::optional<std::size_t> output_size = {},
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Computes the size of a join operation between two tables without
Expand All @@ -63,13 +63,12 @@ conditional_join(table_view const& left,
*
* @return Join output indices vector pair
*/
std::size_t compute_conditional_join_output_size(
table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
join_kind JoinKind,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
std::size_t compute_conditional_join_output_size(table_view const& left,
table_view const& right,
ast::expression const& binary_predicate,
join_kind JoinKind,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

} // namespace detail
} // namespace cudf
8 changes: 4 additions & 4 deletions cpp/src/merge/merge.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -170,8 +170,8 @@ index_vector generate_merged_indices(table_view const& left_table,
table_view const& right_table,
std::vector<order> const& column_order,
std::vector<null_order> const& null_precedence,
bool nullable = true,
rmm::cuda_stream_view stream = cudf::get_default_stream())
bool nullable,
rmm::cuda_stream_view stream)
{
const size_type left_size = left_table.num_rows();
const size_type right_size = right_table.num_rows();
Expand Down Expand Up @@ -410,7 +410,7 @@ table_ptr_type merge(cudf::table_view const& left_table,
// extract merged row order according to indices:
//
auto const merged_indices = generate_merged_indices(
index_left_view, index_right_view, column_order, null_precedence, nullable);
index_left_view, index_right_view, column_order, null_precedence, nullable, stream);

// create merged table:
//
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/partitioning/round_robin.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -151,9 +151,9 @@ namespace detail {
std::pair<std::unique_ptr<table>, std::vector<cudf::size_type>> round_robin_partition(
table_view const& input,
cudf::size_type num_partitions,
cudf::size_type start_partition = 0,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
cudf::size_type start_partition,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto nrows = input.num_rows();

Expand Down
9 changes: 4 additions & 5 deletions cpp/src/rolling/detail/range_window_bounds.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -146,10 +146,9 @@ RepT range_comparable_value_impl(scalar const& range_scalar,
* @return RepType Value of the range scalar
*/
template <typename OrderByType>
range_rep_type<OrderByType> range_comparable_value(
range_window_bounds const& range_bounds,
data_type const& order_by_data_type = data_type{type_to_id<OrderByType>()},
rmm::cuda_stream_view stream = cudf::get_default_stream())
range_rep_type<OrderByType> range_comparable_value(range_window_bounds const& range_bounds,
data_type const& order_by_data_type,
rmm::cuda_stream_view stream)
{
auto const& range_scalar = range_bounds.range_scalar();
using range_type = cudf::detail::range_type<OrderByType>;
Expand Down
54 changes: 38 additions & 16 deletions cpp/tests/rolling/range_window_bounds_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf_test/type_lists.hpp>

#include <cudf/rolling/range_window_bounds.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <src/rolling/detail/range_window_bounds.hpp>

#include <vector>
Expand Down Expand Up @@ -57,34 +58,43 @@ TYPED_TEST(TimestampRangeWindowBoundsTest, BoundsConstruction)
using OrderByType = TypeParam;
using range_type = cudf::detail::range_type<OrderByType>;
using rep_type = cudf::detail::range_rep_type<OrderByType>;
auto const dtype = cudf::data_type{cudf::type_to_id<OrderByType>()};

static_assert(cudf::is_duration<range_type>());
auto range_3 = cudf::range_window_bounds::get(cudf::duration_scalar<range_type>{3, true});
EXPECT_FALSE(range_3.is_unbounded() &&
"range_window_bounds constructed from scalar cannot be unbounded.");
EXPECT_EQ(cudf::detail::range_comparable_value<OrderByType>(range_3), rep_type{3});
EXPECT_EQ(
cudf::detail::range_comparable_value<OrderByType>(range_3, dtype, cudf::get_default_stream()),
rep_type{3});

auto range_unbounded =
cudf::range_window_bounds::unbounded(cudf::data_type{cudf::type_to_id<range_type>()});
EXPECT_TRUE(range_unbounded.is_unbounded() &&
"range_window_bounds::unbounded() must return an unbounded range.");
EXPECT_EQ(cudf::detail::range_comparable_value<OrderByType>(range_unbounded), rep_type{});
EXPECT_EQ(cudf::detail::range_comparable_value<OrderByType>(
range_unbounded, dtype, cudf::get_default_stream()),
rep_type{});
}

TYPED_TEST(TimestampRangeWindowBoundsTest, WrongRangeType)
{
using OrderByType = TypeParam;
auto const dtype = cudf::data_type{cudf::type_to_id<OrderByType>()};

using wrong_range_type = std::conditional_t<std::is_same_v<OrderByType, cudf::timestamp_D>,
cudf::duration_ns,
cudf::duration_D>;
auto range_3 = cudf::range_window_bounds::get(cudf::duration_scalar<wrong_range_type>{3, true});

EXPECT_THROW(cudf::detail::range_comparable_value<OrderByType>(range_3), cudf::logic_error);
EXPECT_THROW(
cudf::detail::range_comparable_value<OrderByType>(range_3, dtype, cudf::get_default_stream()),
cudf::logic_error);

auto range_unbounded =
cudf::range_window_bounds::unbounded(cudf::data_type{cudf::type_to_id<wrong_range_type>()});
EXPECT_THROW(cudf::detail::range_comparable_value<OrderByType>(range_unbounded),
EXPECT_THROW(cudf::detail::range_comparable_value<OrderByType>(
range_unbounded, dtype, cudf::get_default_stream()),
cudf::logic_error);
}

Expand Down Expand Up @@ -112,33 +122,42 @@ TYPED_TEST(NumericRangeWindowBoundsTest, BoundsConstruction)
using OrderByType = TypeParam;
using range_type = cudf::detail::range_type<OrderByType>;
using rep_type = cudf::detail::range_rep_type<OrderByType>;
auto const dtype = cudf::data_type{cudf::type_to_id<OrderByType>()};

static_assert(std::is_integral_v<range_type>);
auto range_3 = cudf::range_window_bounds::get(cudf::numeric_scalar<range_type>{3, true});
EXPECT_FALSE(range_3.is_unbounded() &&
"range_window_bounds constructed from scalar cannot be unbounded.");
EXPECT_EQ(cudf::detail::range_comparable_value<OrderByType>(range_3), rep_type{3});
EXPECT_EQ(
cudf::detail::range_comparable_value<OrderByType>(range_3, dtype, cudf::get_default_stream()),
rep_type{3});

auto range_unbounded =
cudf::range_window_bounds::unbounded(cudf::data_type{cudf::type_to_id<range_type>()});
EXPECT_TRUE(range_unbounded.is_unbounded() &&
"range_window_bounds::unbounded() must return an unbounded range.");
EXPECT_EQ(cudf::detail::range_comparable_value<OrderByType>(range_unbounded), rep_type{});
EXPECT_EQ(cudf::detail::range_comparable_value<OrderByType>(
range_unbounded, dtype, cudf::get_default_stream()),
rep_type{});
}

TYPED_TEST(NumericRangeWindowBoundsTest, WrongRangeType)
{
using OrderByType = TypeParam;
auto const dtype = cudf::data_type{cudf::type_to_id<OrderByType>()};

using wrong_range_type =
std::conditional_t<std::is_same_v<OrderByType, int32_t>, int16_t, int32_t>;
auto range_3 = cudf::range_window_bounds::get(cudf::numeric_scalar<wrong_range_type>{3, true});

EXPECT_THROW(cudf::detail::range_comparable_value<OrderByType>(range_3), cudf::logic_error);
EXPECT_THROW(
cudf::detail::range_comparable_value<OrderByType>(range_3, dtype, cudf::get_default_stream()),
cudf::logic_error);

auto range_unbounded =
cudf::range_window_bounds::unbounded(cudf::data_type{cudf::type_to_id<wrong_range_type>()});
EXPECT_THROW(cudf::detail::range_comparable_value<OrderByType>(range_unbounded),
EXPECT_THROW(cudf::detail::range_comparable_value<OrderByType>(
range_unbounded, dtype, cudf::get_default_stream()),
cudf::logic_error);
}

Expand All @@ -150,8 +169,9 @@ TYPED_TEST_SUITE(DecimalRangeBoundsTest, cudf::test::FixedPointTypes);

TYPED_TEST(DecimalRangeBoundsTest, BoundsConstruction)
{
using DecimalT = TypeParam;
using Rep = cudf::detail::range_rep_type<DecimalT>;
using DecimalT = TypeParam;
using Rep = cudf::detail::range_rep_type<DecimalT>;
auto const dtype = cudf::data_type{cudf::type_to_id<DecimalT>()};

// Interval type must match the decimal type.
static_assert(std::is_same_v<cudf::detail::range_type<DecimalT>, DecimalT>);
Expand All @@ -160,7 +180,9 @@ TYPED_TEST(DecimalRangeBoundsTest, BoundsConstruction)
cudf::fixed_point_scalar<DecimalT>{Rep{3}, numeric::scale_type{0}});
EXPECT_FALSE(range_3.is_unbounded() &&
"range_window_bounds constructed from scalar cannot be unbounded.");
EXPECT_EQ(cudf::detail::range_comparable_value<DecimalT>(range_3), Rep{3});
EXPECT_EQ(
cudf::detail::range_comparable_value<DecimalT>(range_3, dtype, cudf::get_default_stream()),
Rep{3});

auto const range_unbounded =
cudf::range_window_bounds::unbounded(cudf::data_type{cudf::type_to_id<DecimalT>()});
Expand All @@ -183,17 +205,17 @@ TYPED_TEST(DecimalRangeBoundsTest, Rescale)
for (auto const range_scale : {-2, -1, 0, 1, 2}) {
auto const decimal_range_bounds = cudf::range_window_bounds::get(
cudf::fixed_point_scalar<DecimalT>{RepT{20}, numeric::scale_type{range_scale}});
auto const rescaled_range_rep =
cudf::detail::range_comparable_value<DecimalT>(decimal_range_bounds, order_by_data_type);
auto const rescaled_range_rep = cudf::detail::range_comparable_value<DecimalT>(
decimal_range_bounds, order_by_data_type, cudf::get_default_stream());
EXPECT_EQ(rescaled_range_rep, RepT{20} * pow10[range_scale - order_by_scale]);
}

// Order By column scale cannot exceed range scale:
{
auto const decimal_range_bounds = cudf::range_window_bounds::get(
cudf::fixed_point_scalar<DecimalT>{RepT{200}, numeric::scale_type{-3}});
EXPECT_THROW(
cudf::detail::range_comparable_value<DecimalT>(decimal_range_bounds, order_by_data_type),
cudf::logic_error);
EXPECT_THROW(cudf::detail::range_comparable_value<DecimalT>(
decimal_range_bounds, order_by_data_type, cudf::get_default_stream()),
cudf::logic_error);
}
}

0 comments on commit dfa9e93

Please sign in to comment.