Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segmented Min/Max for Fixed Point Types #10794

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 27 additions & 30 deletions cpp/include/cudf/detail/reduction.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -230,47 +230,45 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
* @tparam InputIterator the input column iterator
* @tparam OffsetIterator the offset column iterator
* @tparam BinaryOp the device binary operator used to reduce
* @tparam OutputIterator the output column iterator
* @tparam OutputType the output type of reduction
*
* @param[in] d_in the begin iterator to input
* @param[in] d_offset the begin iterator to offset
isVoid marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] num_segments the number of segments
* @param[out] d_out the begin iterator to output
* @param[in] binary_op the reduction operator
* @param[in] identity the identity element of the reduction operator
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
* @param[in] mr Device memory resource used to allocate the returned column's device
* memory
* @returns Output column in device memory
*
*/
template <typename InputIterator,
typename OffsetIterator,
typename BinaryOp,
typename OutputType = typename thrust::iterator_value<InputIterator>::type,
typename OutputIterator,
isVoid marked this conversation as resolved.
Show resolved Hide resolved
typename OutputType = typename thrust::iterator_value<OutputIterator>::type,
typename std::enable_if_t<is_fixed_width<OutputType>() &&
!cudf::is_fixed_point<OutputType>()>* = nullptr>
std::unique_ptr<column> segmented_reduce(InputIterator d_in,
OffsetIterator d_offset,
cudf::size_type num_segments,
BinaryOp binary_op,
OutputType identity,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
void segmented_reduce(InputIterator d_in,
OffsetIterator d_offset_begin,
OffsetIterator d_offset_end,
OutputIterator d_out,
BinaryOp binary_op,
OutputType identity,
rmm::cuda_stream_view stream)
{
auto dev_result = make_fixed_width_column(
data_type{type_to_id<OutputType>()}, num_segments, mask_state::UNALLOCATED, stream, mr);
auto dev_result_mview = dev_result->mutable_view();
auto num_segments = static_cast<size_type>(std::distance(d_offset_begin, d_offset_end)) - 1;

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
size_t temp_storage_bytes = 0;
cub::DeviceSegmentedReduce::Reduce(d_temp_storage.data(),
temp_storage_bytes,
d_in,
dev_result_mview.data<OutputType>(),
d_out,
num_segments,
d_offset,
d_offset + 1,
d_offset_begin,
d_offset_begin + 1,
binary_op,
identity,
stream.value());
Expand All @@ -280,30 +278,29 @@ std::unique_ptr<column> segmented_reduce(InputIterator d_in,
cub::DeviceSegmentedReduce::Reduce(d_temp_storage.data(),
temp_storage_bytes,
d_in,
dev_result_mview.data<OutputType>(),
d_out,
num_segments,
d_offset,
d_offset + 1,
d_offset_begin,
d_offset_begin + 1,
binary_op,
identity,
stream.value());

return dev_result;
}

template <typename InputIterator,
typename OffsetIterator,
typename BinaryOp,
typename OutputType = typename thrust::iterator_value<InputIterator>::type,
typename OutputIterator,
isVoid marked this conversation as resolved.
Show resolved Hide resolved
typename OutputType = typename thrust::iterator_value<OutputIterator>::type,
typename std::enable_if_t<!(is_fixed_width<OutputType>() &&
!cudf::is_fixed_point<OutputType>())>* = nullptr>
std::unique_ptr<column> segmented_reduce(InputIterator,
OffsetIterator,
cudf::size_type,
BinaryOp,
OutputType,
rmm::cuda_stream_view,
rmm::mr::device_memory_resource*)
void segmented_reduce(InputIterator,
OffsetIterator,
OffsetIterator,
OutputIterator,
BinaryOp,
OutputType,
rmm::cuda_stream_view)
{
CUDF_FAIL(
"Unsupported data types called on segmented_reduce. Only numeric and chrono types are "
Expand Down
107 changes: 83 additions & 24 deletions cpp/src/reductions/simple_segmented.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,24 @@ std::unique_ptr<column> simple_segmented_reduction(column_view const& col,
auto binary_op = simple_op.get_binary_op();
auto identity = simple_op.template get_identity<ResultType>();

auto const result_type =
cudf::is_fixed_point(col.type()) ? col.type() : data_type{type_to_id<ResultType>()};
auto result =
make_fixed_width_column(result_type, num_segments, mask_state::UNALLOCATED, stream, mr);
auto outit = result->mutable_view().template begin<ResultType>();

// TODO: Explore rewriting null_replacing_element_transformer/element_transformer with nullate
auto result = [&] {
if (col.has_nulls()) {
auto f = simple_op.template get_null_replacing_element_transformer<ResultType>();
auto it = thrust::make_transform_iterator(dcol->pair_begin<InputType, true>(), f);
return cudf::reduction::detail::segmented_reduce(
it, offsets.begin(), num_segments, binary_op, identity, stream, mr);
} else {
auto f = simple_op.template get_element_transformer<ResultType>();
auto it = thrust::make_transform_iterator(dcol->begin<InputType>(), f);
return cudf::reduction::detail::segmented_reduce(
it, offsets.begin(), num_segments, binary_op, identity, stream, mr);
}
}();
if (col.has_nulls()) {
auto f = simple_op.template get_null_replacing_element_transformer<ResultType>();
auto it = thrust::make_transform_iterator(dcol->pair_begin<InputType, true>(), f);
cudf::reduction::detail::segmented_reduce(
it, offsets.begin(), offsets.end(), outit, binary_op, identity, stream);
} else {
auto f = simple_op.template get_element_transformer<ResultType>();
auto it = thrust::make_transform_iterator(dcol->begin<InputType>(), f);
cudf::reduction::detail::segmented_reduce(
it, offsets.begin(), offsets.end(), outit, binary_op, identity, stream);
}

// Compute the output null mask
auto const bitmask = col.null_mask();
Expand Down Expand Up @@ -153,14 +157,14 @@ std::unique_ptr<column> string_segmented_reduction(column_view const& col,
auto constexpr identity =
is_argmin ? cudf::detail::ARGMIN_SENTINEL : cudf::detail::ARGMAX_SENTINEL;

auto gather_map =
cudf::reduction::detail::segmented_reduce(it,
offsets.begin(),
num_segments,
string_comparator,
identity,
stream,
rmm::mr::get_current_device_resource());
auto gather_map = make_fixed_width_column(
data_type{type_to_id<size_type>()}, num_segments, mask_state::UNALLOCATED, stream, mr);

auto gather_map_it = gather_map->mutable_view().begin<size_type>();

cudf::reduction::detail::segmented_reduce(
it, offsets.begin(), offsets.end(), gather_map_it, string_comparator, identity, stream);

auto result = std::move(cudf::detail::gather(table_view{{col}},
*gather_map,
cudf::out_of_bounds_policy::NULLIFY,
Expand Down Expand Up @@ -218,6 +222,49 @@ std::unique_ptr<column> string_segmented_reduction(column_view const& col,
CUDF_FAIL("Segmented reduction on string column only supports min and max reduction.");
}

/**
* @brief Fixed point segmented reduction for 'min', 'max'.
*
* @tparam InputType the input column data-type
* @tparam Op the operator of cudf::reduction::op::

* @param col Input column of data to reduce.
* @param offsets Indices to segment boundaries.
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment
* must be valid for the reduced value to be valid. If `null_policy::EXCLUDE`,
* the reduced value is valid if any element in the segment is valid.
* @param stream Used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
* @return Output column in device memory
*/

template <typename InputType,
typename Op,
CUDF_ENABLE_IF(std::is_same_v<Op, cudf::reduction::op::min> ||
std::is_same_v<Op, cudf::reduction::op::max>)>
isVoid marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<column> fixed_point_segmented_reduction(column_view const& col,
device_span<size_type const> offsets,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using RepType = device_storage_type_t<InputType>;
return simple_segmented_reduction<RepType, RepType, Op>(col, offsets, null_handling, stream, mr);
}

template <typename InputType,
typename Op,
CUDF_ENABLE_IF(!std::is_same_v<Op, cudf::reduction::op::min>() &&
!std::is_same_v<Op, cudf::reduction::op::max>())>
std::unique_ptr<column> fixed_point_segmented_reduction(column_view const& col,
device_span<size_type const> offsets,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FAIL("Segmented reduction on fixed point column only supports min and max reduction.");
}

/**
* @brief Call reduce and return a column of type bool.
*
Expand Down Expand Up @@ -262,15 +309,15 @@ struct same_column_type_dispatcher {
template <typename ElementType>
static constexpr bool is_supported()
{
return !(cudf::is_fixed_point<ElementType>() || cudf::is_dictionary<ElementType>() ||
std::is_same_v<ElementType, cudf::list_view> ||
return !(cudf::is_dictionary<ElementType>() || std::is_same_v<ElementType, cudf::list_view> ||
std::is_same_v<ElementType, cudf::struct_view>);
}

public:
template <typename ElementType,
CUDF_ENABLE_IF(is_supported<ElementType>() &&
!std::is_same_v<ElementType, string_view>)>
!std::is_same_v<ElementType, string_view> &&
!cudf::is_fixed_point<ElementType>())>
std::unique_ptr<column> operator()(column_view const& col,
device_span<size_type const> offsets,
null_policy null_handling,
Expand All @@ -292,6 +339,18 @@ struct same_column_type_dispatcher {
return string_segmented_reduction<ElementType, Op>(col, offsets, null_handling, stream, mr);
}

template <typename ElementType,
CUDF_ENABLE_IF(is_supported<ElementType>() && cudf::is_fixed_point<ElementType>())>
std::unique_ptr<column> operator()(column_view const& col,
device_span<size_type const> offsets,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return fixed_point_segmented_reduction<ElementType, Op>(
col, offsets, null_handling, stream, mr);
}

template <typename ElementType, CUDF_ENABLE_IF(!is_supported<ElementType>())>
std::unique_ptr<column> operator()(column_view const&,
device_span<size_type const>,
Expand Down
134 changes: 134 additions & 0 deletions cpp/tests/reductions/segmented_reduction_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,140 @@ TEST_F(SegmentedReductionTestUntyped, ReduceEmptyColumn)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect);
}

template <typename T>
struct SegmentedReductionFixedPointTest : public cudf::test::BaseFixture {
};

TYPED_TEST_SUITE(SegmentedReductionFixedPointTest, cudf::test::FixedPointTypes);

TYPED_TEST(SegmentedReductionFixedPointTest, MaxIncludeNulls)
{
// scale: -2, 0, 5
// [1, 2, 3], [1, null, 3], [1], [null], [null, null], []
// values: {1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX}
// offsets: {0, 3, 6, 7, 8, 10, 10}
// nullmask: {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}
// outputs: {3, XXX, 1, XXX, XXX, XXX}
// output nullmask: {1, 0, 1, 0, 0, 0}

using RepType = device_storage_type_t<TypeParam>;

for (auto scale : {-2, 0, 5}) {
auto input = fixed_point_column_wrapper<RepType>({1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX},
{1, 1, 1, 1, 0, 1, 1, 0, 0, 0},
numeric::scale_type{scale});
auto offsets = std::vector<size_type>{0, 3, 6, 7, 8, 10, 10};
auto d_offsets = thrust::device_vector<size_type>(offsets);
auto out_type = column_view(input).type();
auto expect = fixed_point_column_wrapper<RepType>(
{3, XXX, 1, XXX, XXX, XXX}, {1, 0, 1, 0, 0, 0}, numeric::scale_type{scale});

auto res = segmented_reduce(input,
d_offsets,
*make_max_aggregation<segmented_reduce_aggregation>(),
out_type,
null_policy::INCLUDE);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect);
}
}

TYPED_TEST(SegmentedReductionFixedPointTest, MaxExcludeNulls)
{
// scale: -2, 0, 5
// [1, 2, 3], [1, null, 3], [1], [null], [null, null], []
// values: {1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX}
// offsets: {0, 3, 6, 7, 8, 10, 10}
// nullmask: {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}
// outputs: {3, 3, 1, XXX, XXX, XXX}
// output nullmask: {1, 1, 1, 0, 0, 0}

using RepType = device_storage_type_t<TypeParam>;

for (auto scale : {-2, 0, 5}) {
auto input = fixed_point_column_wrapper<RepType>({1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX},
{1, 1, 1, 1, 0, 1, 1, 0, 0, 0},
numeric::scale_type{scale});
auto offsets = std::vector<size_type>{0, 3, 6, 7, 8, 10, 10};
auto d_offsets = thrust::device_vector<size_type>(offsets);
auto out_type = column_view(input).type();
auto expect = fixed_point_column_wrapper<RepType>(
{3, 3, 1, XXX, XXX, XXX}, {1, 1, 1, 0, 0, 0}, numeric::scale_type{scale});

auto res = segmented_reduce(input,
d_offsets,
*make_max_aggregation<segmented_reduce_aggregation>(),
out_type,
null_policy::EXCLUDE);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect);
}
}

TYPED_TEST(SegmentedReductionFixedPointTest, MinIncludeNulls)
{
// scale: -2, 0, 5
// [1, 2, 3], [1, null, 3], [1], [null], [null, null], []
// values: {1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX}
// offsets: {0, 3, 6, 7, 8, 10, 10}
// nullmask: {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}
// outputs: {1, XXX, 1, XXX, XXX, XXX}
// output nullmask: {1, 0, 1, 0, 0, 0}

using RepType = device_storage_type_t<TypeParam>;

for (auto scale : {-2, 0, 5}) {
auto input = fixed_point_column_wrapper<RepType>({1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX},
{1, 1, 1, 1, 0, 1, 1, 0, 0, 0},
numeric::scale_type{scale});
auto offsets = std::vector<size_type>{0, 3, 6, 7, 8, 10, 10};
auto d_offsets = thrust::device_vector<size_type>(offsets);
auto out_type = column_view(input).type();
auto expect = fixed_point_column_wrapper<RepType>(
{1, XXX, 1, XXX, XXX, XXX}, {1, 0, 1, 0, 0, 0}, numeric::scale_type{scale});

auto res = segmented_reduce(input,
d_offsets,
*make_min_aggregation<segmented_reduce_aggregation>(),
out_type,
null_policy::INCLUDE);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect);
}
}

TYPED_TEST(SegmentedReductionFixedPointTest, MinExcludeNulls)
{
// scale: -2, 0, 5
// [1, 2, 3], [1, null, 3], [1], [null], [null, null], []
// values: {1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX}
// offsets: {0, 3, 6, 7, 8, 10, 10}
// nullmask: {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}
// outputs: {1, 1, 1, XXX, XXX, XXX}
// output nullmask: {1, 1, 1, 0, 0, 0}

using RepType = device_storage_type_t<TypeParam>;

for (auto scale : {-2, 0, 5}) {
auto input = fixed_point_column_wrapper<RepType>({1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX},
{1, 1, 1, 1, 0, 1, 1, 0, 0, 0},
numeric::scale_type{scale});
auto offsets = std::vector<size_type>{0, 3, 6, 7, 8, 10, 10};
auto d_offsets = thrust::device_vector<size_type>(offsets);
auto out_type = column_view(input).type();
auto expect = fixed_point_column_wrapper<RepType>(
{1, 1, 1, XXX, XXX, XXX}, {1, 1, 1, 0, 0, 0}, numeric::scale_type{scale});

auto res = segmented_reduce(input,
d_offsets,
*make_min_aggregation<segmented_reduce_aggregation>(),
out_type,
null_policy::EXCLUDE);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect);
}
}

isVoid marked this conversation as resolved.
Show resolved Hide resolved
// String min/max test grid
// Segment: Length 0, length 1, length 2
// Element nulls: No nulls, all nulls, some nulls
Expand Down