diff --git a/cpp/include/cudf/detail/reduction.cuh b/cpp/include/cudf/detail/reduction.cuh index 023d83f3c24..879f01394cc 100644 --- a/cpp/include/cudf/detail/reduction.cuh +++ b/cpp/include/cudf/detail/reduction.cuh @@ -229,37 +229,36 @@ std::unique_ptr reduce(InputIterator d_in, * * @tparam InputIterator the input column iterator * @tparam OffsetIterator the offset column iterator + * @tparam OutputIterator the output column iterator * @tparam BinaryOp the device binary operator used to reduce * @tparam OutputType the output type of reduction * * @param[in] d_in the begin iterator to input - * @param[in] d_offset the begin iterator to offset - * @param[in] num_segments the number of segments + * @param[in] d_offset_begin the begin iterator to offset + * @param[in] d_offset_end the end iterator to offset. Note: This is + * num_segments+1 elements past `d_offset_begin`. + * @param[out] d_out the begin iterator to output * @param[in] binary_op the reduction operator * @param[in] identity the identity element of the reduction operator * @param[in] stream CUDA stream used for device memory operations and kernel launches. - * @param[in] mr Device memory resource used to allocate the returned column's device - * memory - * @returns Output column in device memory * */ template ::type, + typename OutputType = typename thrust::iterator_value::type, typename std::enable_if_t() && !cudf::is_fixed_point()>* = nullptr> -std::unique_ptr segmented_reduce(InputIterator d_in, - OffsetIterator d_offset, - cudf::size_type num_segments, - BinaryOp binary_op, - OutputType identity, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +void segmented_reduce(InputIterator d_in, + OffsetIterator d_offset_begin, + OffsetIterator d_offset_end, + OutputIterator d_out, + BinaryOp binary_op, + OutputType identity, + rmm::cuda_stream_view stream) { - auto dev_result = make_fixed_width_column( - data_type{type_to_id()}, num_segments, mask_state::UNALLOCATED, stream, mr); - auto dev_result_mview = dev_result->mutable_view(); + auto num_segments = static_cast(std::distance(d_offset_begin, d_offset_end)) - 1; // Allocate temporary storage rmm::device_buffer d_temp_storage; @@ -267,10 +266,10 @@ std::unique_ptr segmented_reduce(InputIterator d_in, cub::DeviceSegmentedReduce::Reduce(d_temp_storage.data(), temp_storage_bytes, d_in, - dev_result_mview.data(), + d_out, num_segments, - d_offset, - d_offset + 1, + d_offset_begin, + d_offset_begin + 1, binary_op, identity, stream.value()); @@ -280,30 +279,29 @@ std::unique_ptr segmented_reduce(InputIterator d_in, cub::DeviceSegmentedReduce::Reduce(d_temp_storage.data(), temp_storage_bytes, d_in, - dev_result_mview.data(), + d_out, num_segments, - d_offset, - d_offset + 1, + d_offset_begin, + d_offset_begin + 1, binary_op, identity, stream.value()); - - return dev_result; } template ::type, + typename OutputType = typename thrust::iterator_value::type, typename std::enable_if_t() && !cudf::is_fixed_point())>* = nullptr> -std::unique_ptr segmented_reduce(InputIterator, - OffsetIterator, - cudf::size_type, - BinaryOp, - OutputType, - rmm::cuda_stream_view, - rmm::mr::device_memory_resource*) +void segmented_reduce(InputIterator, + OffsetIterator, + OffsetIterator, + OutputIterator, + BinaryOp, + OutputType, + rmm::cuda_stream_view) { CUDF_FAIL( "Unsupported data types called on segmented_reduce. Only numeric and chrono types are " diff --git a/cpp/src/reductions/simple_segmented.cuh b/cpp/src/reductions/simple_segmented.cuh index 7796794502d..224576cef4a 100644 --- a/cpp/src/reductions/simple_segmented.cuh +++ b/cpp/src/reductions/simple_segmented.cuh @@ -79,20 +79,24 @@ std::unique_ptr simple_segmented_reduction(column_view const& col, auto binary_op = simple_op.get_binary_op(); auto identity = simple_op.template get_identity(); + auto const result_type = + cudf::is_fixed_point(col.type()) ? col.type() : data_type{type_to_id()}; + auto result = + make_fixed_width_column(result_type, num_segments, mask_state::UNALLOCATED, stream, mr); + auto outit = result->mutable_view().template begin(); + // TODO: Explore rewriting null_replacing_element_transformer/element_transformer with nullate - auto result = [&] { - if (col.has_nulls()) { - auto f = simple_op.template get_null_replacing_element_transformer(); - auto it = thrust::make_transform_iterator(dcol->pair_begin(), f); - return cudf::reduction::detail::segmented_reduce( - it, offsets.begin(), num_segments, binary_op, identity, stream, mr); - } else { - auto f = simple_op.template get_element_transformer(); - auto it = thrust::make_transform_iterator(dcol->begin(), f); - return cudf::reduction::detail::segmented_reduce( - it, offsets.begin(), num_segments, binary_op, identity, stream, mr); - } - }(); + if (col.has_nulls()) { + auto f = simple_op.template get_null_replacing_element_transformer(); + auto it = thrust::make_transform_iterator(dcol->pair_begin(), f); + cudf::reduction::detail::segmented_reduce( + it, offsets.begin(), offsets.end(), outit, binary_op, identity, stream); + } else { + auto f = simple_op.template get_element_transformer(); + auto it = thrust::make_transform_iterator(dcol->begin(), f); + cudf::reduction::detail::segmented_reduce( + it, offsets.begin(), offsets.end(), outit, binary_op, identity, stream); + } // Compute the output null mask auto const bitmask = col.null_mask(); @@ -153,14 +157,14 @@ std::unique_ptr string_segmented_reduction(column_view const& col, auto constexpr identity = is_argmin ? cudf::detail::ARGMIN_SENTINEL : cudf::detail::ARGMAX_SENTINEL; - auto gather_map = - cudf::reduction::detail::segmented_reduce(it, - offsets.begin(), - num_segments, - string_comparator, - identity, - stream, - rmm::mr::get_current_device_resource()); + auto gather_map = make_fixed_width_column( + data_type{type_to_id()}, num_segments, mask_state::UNALLOCATED, stream, mr); + + auto gather_map_it = gather_map->mutable_view().begin(); + + cudf::reduction::detail::segmented_reduce( + it, offsets.begin(), offsets.end(), gather_map_it, string_comparator, identity, stream); + auto result = std::move(cudf::detail::gather(table_view{{col}}, *gather_map, cudf::out_of_bounds_policy::NULLIFY, @@ -218,6 +222,49 @@ std::unique_ptr string_segmented_reduction(column_view const& col, CUDF_FAIL("Segmented reduction on string column only supports min and max reduction."); } +/** + * @brief Fixed point segmented reduction for 'min', 'max'. + * + * @tparam InputType the input column data-type + * @tparam Op the operator of cudf::reduction::op:: + + * @param col Input column of data to reduce. + * @param offsets Indices to segment boundaries. + * @param null_handling If `null_policy::INCLUDE`, all elements in a segment + * must be valid for the reduced value to be valid. If `null_policy::EXCLUDE`, + * the reduced value is valid if any element in the segment is valid. + * @param stream Used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory + * @return Output column in device memory + */ + +template || + std::is_same_v)> +std::unique_ptr fixed_point_segmented_reduction(column_view const& col, + device_span offsets, + null_policy null_handling, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + using RepType = device_storage_type_t; + return simple_segmented_reduction(col, offsets, null_handling, stream, mr); +} + +template () && + !std::is_same_v())> +std::unique_ptr fixed_point_segmented_reduction(column_view const& col, + device_span offsets, + null_policy null_handling, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_FAIL("Segmented reduction on fixed point column only supports min and max reduction."); +} + /** * @brief Call reduce and return a column of type bool. * @@ -262,15 +309,15 @@ struct same_column_type_dispatcher { template static constexpr bool is_supported() { - return !(cudf::is_fixed_point() || cudf::is_dictionary() || - std::is_same_v || + return !(cudf::is_dictionary() || std::is_same_v || std::is_same_v); } public: template () && - !std::is_same_v)> + !std::is_same_v && + !cudf::is_fixed_point())> std::unique_ptr operator()(column_view const& col, device_span offsets, null_policy null_handling, @@ -292,6 +339,18 @@ struct same_column_type_dispatcher { return string_segmented_reduction(col, offsets, null_handling, stream, mr); } + template () && cudf::is_fixed_point())> + std::unique_ptr operator()(column_view const& col, + device_span offsets, + null_policy null_handling, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + { + return fixed_point_segmented_reduction( + col, offsets, null_handling, stream, mr); + } + template ())> std::unique_ptr operator()(column_view const&, device_span, diff --git a/cpp/tests/reductions/segmented_reduction_tests.cpp b/cpp/tests/reductions/segmented_reduction_tests.cpp index 8a9a8fb549e..771f1b8d45b 100644 --- a/cpp/tests/reductions/segmented_reduction_tests.cpp +++ b/cpp/tests/reductions/segmented_reduction_tests.cpp @@ -387,6 +387,212 @@ TEST_F(SegmentedReductionTestUntyped, ReduceEmptyColumn) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect); } +template +struct SegmentedReductionFixedPointTest : public cudf::test::BaseFixture { +}; + +TYPED_TEST_SUITE(SegmentedReductionFixedPointTest, cudf::test::FixedPointTypes); + +TYPED_TEST(SegmentedReductionFixedPointTest, MaxIncludeNulls) +{ + // scale: -2, 0, 5 + // [1, 2, 3], [1, null, 3], [1], [null], [null, null], [] + // values: {1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX} + // offsets: {0, 3, 6, 7, 8, 10, 10} + // nullmask: {1, 1, 1, 1, 0, 1, 1, 0, 0, 0} + // outputs: {3, XXX, 1, XXX, XXX, XXX} + // output nullmask: {1, 0, 1, 0, 0, 0} + + using RepType = device_storage_type_t; + + for (auto scale : {-2, 0, 5}) { + auto input = fixed_point_column_wrapper({1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX}, + {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}, + numeric::scale_type{scale}); + auto offsets = std::vector{0, 3, 6, 7, 8, 10, 10}; + auto d_offsets = thrust::device_vector(offsets); + auto out_type = column_view(input).type(); + auto expect = fixed_point_column_wrapper( + {3, XXX, 1, XXX, XXX, XXX}, {1, 0, 1, 0, 0, 0}, numeric::scale_type{scale}); + + auto res = segmented_reduce(input, + d_offsets, + *make_max_aggregation(), + out_type, + null_policy::INCLUDE); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect); + } +} + +TYPED_TEST(SegmentedReductionFixedPointTest, MaxExcludeNulls) +{ + // scale: -2, 0, 5 + // [1, 2, 3], [1, null, 3], [1], [null], [null, null], [] + // values: {1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX} + // offsets: {0, 3, 6, 7, 8, 10, 10} + // nullmask: {1, 1, 1, 1, 0, 1, 1, 0, 0, 0} + // outputs: {3, 3, 1, XXX, XXX, XXX} + // output nullmask: {1, 1, 1, 0, 0, 0} + + using RepType = device_storage_type_t; + + for (auto scale : {-2, 0, 5}) { + auto input = fixed_point_column_wrapper({1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX}, + {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}, + numeric::scale_type{scale}); + auto offsets = std::vector{0, 3, 6, 7, 8, 10, 10}; + auto d_offsets = thrust::device_vector(offsets); + auto out_type = column_view(input).type(); + auto expect = fixed_point_column_wrapper( + {3, 3, 1, XXX, XXX, XXX}, {1, 1, 1, 0, 0, 0}, numeric::scale_type{scale}); + + auto res = segmented_reduce(input, + d_offsets, + *make_max_aggregation(), + out_type, + null_policy::EXCLUDE); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect); + } +} + +TYPED_TEST(SegmentedReductionFixedPointTest, MinIncludeNulls) +{ + // scale: -2, 0, 5 + // [1, 2, 3], [1, null, 3], [1], [null], [null, null], [] + // values: {1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX} + // offsets: {0, 3, 6, 7, 8, 10, 10} + // nullmask: {1, 1, 1, 1, 0, 1, 1, 0, 0, 0} + // outputs: {1, XXX, 1, XXX, XXX, XXX} + // output nullmask: {1, 0, 1, 0, 0, 0} + + using RepType = device_storage_type_t; + + for (auto scale : {-2, 0, 5}) { + auto input = fixed_point_column_wrapper({1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX}, + {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}, + numeric::scale_type{scale}); + auto offsets = std::vector{0, 3, 6, 7, 8, 10, 10}; + auto d_offsets = thrust::device_vector(offsets); + auto out_type = column_view(input).type(); + auto expect = fixed_point_column_wrapper( + {1, XXX, 1, XXX, XXX, XXX}, {1, 0, 1, 0, 0, 0}, numeric::scale_type{scale}); + + auto res = segmented_reduce(input, + d_offsets, + *make_min_aggregation(), + out_type, + null_policy::INCLUDE); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect); + } +} + +TYPED_TEST(SegmentedReductionFixedPointTest, MinExcludeNulls) +{ + // scale: -2, 0, 5 + // [1, 2, 3], [1, null, 3], [1], [null], [null, null], [] + // values: {1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX} + // offsets: {0, 3, 6, 7, 8, 10, 10} + // nullmask: {1, 1, 1, 1, 0, 1, 1, 0, 0, 0} + // outputs: {1, 1, 1, XXX, XXX, XXX} + // output nullmask: {1, 1, 1, 0, 0, 0} + + using RepType = device_storage_type_t; + + for (auto scale : {-2, 0, 5}) { + auto input = fixed_point_column_wrapper({1, 2, 3, 1, XXX, 3, 1, XXX, XXX, XXX}, + {1, 1, 1, 1, 0, 1, 1, 0, 0, 0}, + numeric::scale_type{scale}); + auto offsets = std::vector{0, 3, 6, 7, 8, 10, 10}; + auto d_offsets = thrust::device_vector(offsets); + auto out_type = column_view(input).type(); + auto expect = fixed_point_column_wrapper( + {1, 1, 1, XXX, XXX, XXX}, {1, 1, 1, 0, 0, 0}, numeric::scale_type{scale}); + + auto res = segmented_reduce(input, + d_offsets, + *make_min_aggregation(), + out_type, + null_policy::EXCLUDE); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*res, expect); + } +} + +TYPED_TEST(SegmentedReductionFixedPointTest, MaxNonNullableInput) +{ + // scale: -2, 0, 5 + // [1, 2, 3], [1], [] + // values: {1, 2, 3, 1} + // offsets: {0, 3, 4} + // outputs: {3, 1, XXX} + // output nullmask: {1, 1, 0} + + using RepType = device_storage_type_t; + + for (auto scale : {-2, 0, 5}) { + auto input = fixed_point_column_wrapper({1, 2, 3, 1}, numeric::scale_type{scale}); + auto offsets = std::vector{0, 3, 4, 4}; + auto d_offsets = thrust::device_vector(offsets); + auto out_type = column_view(input).type(); + auto expect = + fixed_point_column_wrapper({3, 1, XXX}, {1, 1, 0}, numeric::scale_type{scale}); + + auto include_null_res = segmented_reduce(input, + d_offsets, + *make_max_aggregation(), + out_type, + null_policy::INCLUDE); + + auto exclude_null_res = segmented_reduce(input, + d_offsets, + *make_max_aggregation(), + out_type, + null_policy::EXCLUDE); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*include_null_res, expect); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*exclude_null_res, expect); + } +} + +TYPED_TEST(SegmentedReductionFixedPointTest, MinNonNullableInput) +{ + // scale: -2, 0, 5 + // [1, 2, 3], [1], [] + // values: {1, 2, 3, 1} + // offsets: {0, 3, 4} + // outputs: {1, 1, XXX} + // output nullmask: {1, 1, 0} + + using RepType = device_storage_type_t; + + for (auto scale : {-2, 0, 5}) { + auto input = fixed_point_column_wrapper({1, 2, 3, 1}, numeric::scale_type{scale}); + auto offsets = std::vector{0, 3, 4, 4}; + auto d_offsets = thrust::device_vector(offsets); + auto out_type = column_view(input).type(); + auto expect = + fixed_point_column_wrapper({1, 1, XXX}, {1, 1, 0}, numeric::scale_type{scale}); + + auto include_null_res = segmented_reduce(input, + d_offsets, + *make_min_aggregation(), + out_type, + null_policy::INCLUDE); + + auto exclude_null_res = segmented_reduce(input, + d_offsets, + *make_min_aggregation(), + out_type, + null_policy::EXCLUDE); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*include_null_res, expect); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*exclude_null_res, expect); + } +} + // String min/max test grid // Segment: Length 0, length 1, length 2 // Element nulls: No nulls, all nulls, some nulls