From 0eeb0c9239eb62b08a2be81d339c03c1078659c3 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 27 May 2021 18:12:13 -0700 Subject: [PATCH] Fix result column types for empty inputs to rolling window (#8274) Fixes the rolling-window part of #7611. All the rolling window functions return empty results when the input aggregation column is empty, just as they should. But the column types are incorrectly set to match the input type. While this is alright for `[MIN(), MAX(), LEAD(), LAG()]`, it is incorrect for some aggregations: Aggregation | Input Types | Output Type | --------------|----------------------|-----------------------------------| COUNT_VALID | All types | INT32 | COUNT_ALL | All types | INT32 | ROW_NUMBER | All types | INT32 | SUM | Numerics (e.g. INT8) | 64-bit promoted type (e.g. INT64) | SUM | Chrono | Same as input type | SUM | All else | Unsupported | MEAN | Numerics | FLOAT64 | MEAN | Chrono | FLOAT64 | MEAN | All else | Unsupported | COLLECT_LIST | All types T | LIST with child of type T | This mapping is congruent with `cudf::target_type_t` from ``. This commit corrects the type of the output column that results from an empty input. It adds test for all the combinations listed above. Note: This is dependent on #8158, and should be merged after that is committed. Authors: - MithunR (https://github.com/mythrocks) Approvers: - Nghia Truong (https://github.com/ttnghia) - https://github.com/nvdbaranec - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/8274 --- .../cudf/detail/aggregation/aggregation.hpp | 11 +- cpp/src/rolling/grouped_rolling.cu | 4 +- cpp/src/rolling/rolling.cu | 9 +- cpp/src/rolling/rolling_detail.cuh | 44 +- cpp/tests/CMakeLists.txt | 8 +- cpp/tests/rolling/empty_input_test.cpp | 402 ++++++++++++++++++ 6 files changed, 463 insertions(+), 15 deletions(-) create mode 100644 cpp/tests/rolling/empty_input_test.cpp diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index 00562f12633..e230ce0b757 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -784,9 +784,10 @@ struct target_type_impl { // Except for chrono types where result is chrono. (Use FloorDiv) // TODO: MEAN should be only be enabled for duration types - not for timestamps template -struct target_type_impl() && (k == aggregation::MEAN)>> { +struct target_type_impl< + Source, + k, + std::enable_if_t() && !is_chrono() && (k == aggregation::MEAN)>> { using type = double; }; @@ -1032,7 +1033,7 @@ template struct dispatch_aggregation { #pragma nv_exec_check_disable template - CUDA_HOST_DEVICE_CALLABLE decltype(auto) operator()(F&& f, Ts&&... args) const noexcept + CUDA_HOST_DEVICE_CALLABLE decltype(auto) operator()(F&& f, Ts&&... args) const { return f.template operator()(std::forward(args)...); } @@ -1043,7 +1044,7 @@ struct dispatch_source { template CUDA_HOST_DEVICE_CALLABLE decltype(auto) operator()(aggregation::Kind k, F&& f, - Ts&&... args) const noexcept + Ts&&... args) const { return aggregation_dispatcher( k, dispatch_aggregation{}, std::forward(f), std::forward(args)...); diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index 5702a32536c..eee73d7b258 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -94,7 +94,7 @@ std::unique_ptr grouped_rolling_window(table_view const& group_keys, { CUDF_FUNC_RANGE(); - if (input.is_empty()) return empty_like(input); + if (input.is_empty()) { return cudf::detail::empty_output_for_rolling_aggregation(input, aggr); } CUDF_EXPECTS((group_keys.num_columns() == 0 || group_keys.num_rows() == input.size()), "Size mismatch between group_keys and input vector."); @@ -949,7 +949,7 @@ std::unique_ptr grouped_range_rolling_window(table_view const& group_key { CUDF_FUNC_RANGE(); - if (input.is_empty()) return empty_like(input); + if (input.is_empty()) { return cudf::detail::empty_output_for_rolling_aggregation(input, aggr); } CUDF_EXPECTS((group_keys.num_columns() == 0 || group_keys.num_rows() == input.size()), "Size mismatch between group_keys and input vector."); diff --git a/cpp/src/rolling/rolling.cu b/cpp/src/rolling/rolling.cu index 63032128c4d..eb81f81ef12 100644 --- a/cpp/src/rolling/rolling.cu +++ b/cpp/src/rolling/rolling.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include "rolling_detail.cuh" namespace cudf { @@ -46,7 +47,8 @@ std::unique_ptr rolling_window(column_view const& input, { CUDF_FUNC_RANGE(); - if (input.is_empty()) return empty_like(input); + if (input.is_empty()) { return cudf::detail::empty_output_for_rolling_aggregation(input, agg); } + CUDF_EXPECTS((min_periods >= 0), "min_periods must be non-negative"); CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()), @@ -88,8 +90,9 @@ std::unique_ptr rolling_window(column_view const& input, { CUDF_FUNC_RANGE(); - if (preceding_window.is_empty() || following_window.is_empty() || input.is_empty()) - return empty_like(input); + if (preceding_window.is_empty() || following_window.is_empty() || input.is_empty()) { + return cudf::detail::empty_output_for_rolling_aggregation(input, agg); + } CUDF_EXPECTS(preceding_window.type().id() == type_id::INT32 && following_window.type().id() == type_id::INT32, diff --git a/cpp/src/rolling/rolling_detail.cuh b/cpp/src/rolling/rolling_detail.cuh index 6f1776e40a3..56b8bad0bac 100644 --- a/cpp/src/rolling/rolling_detail.cuh +++ b/cpp/src/rolling/rolling_detail.cuh @@ -310,6 +310,48 @@ struct DeviceRollingRowNumber { } }; +struct agg_specific_empty_output { + template + std::unique_ptr operator()(column_view const& input, rolling_aggregation const& agg) const + { + using target_type = cudf::detail::target_type_t; + + if constexpr (std::is_same_v, void>) { + CUDF_FAIL("Unsupported combination of column-type and aggregation."); + } + + if constexpr (cudf::is_fixed_width()) { + return cudf::make_empty_column(data_type{type_to_id()}); + } + + if constexpr (op == aggregation::COLLECT_LIST) { + return cudf::make_lists_column( + 0, make_empty_column(data_type{type_to_id()}), empty_like(input), 0, {}); + } + + return empty_like(input); + } +}; + +std::unique_ptr empty_output_for_rolling_aggregation(column_view const& input, + rolling_aggregation const& agg) +{ + // TODO: + // Ideally, for UDF aggregations, the returned column would match + // the agg's return type. It currently returns empty_like(input), because: + // 1. This preserves prior behaviour for empty input columns. + // 2. There is insufficient information to construct nested return colums. + // `cudf::make_udf_aggregation()` expresses the return type as a `data_type` + // which cannot express recursively nested types (e.g. `STRUCT>`.) + // 3. In any case, UDFs that return nested types are not currently supported. + // Constructing a more accurate return type for UDFs will be taken up + // at a later date. + return agg.kind == aggregation::CUDA || agg.kind == aggregation::PTX + ? empty_like(input) + : cudf::detail::dispatch_type_and_aggregation( + input.type(), agg.kind, agg_specific_empty_output{}, input, agg); +} + /** * @brief Operator for applying a LEAD rolling aggregation on a single window. */ @@ -1089,7 +1131,7 @@ std::unique_ptr rolling_window(column_view const& input, static_assert(warp_size == cudf::detail::size_in_bits(), "bitmask_type size does not match CUDA warp size"); - if (input.is_empty()) { return empty_like(input); } + if (input.is_empty()) { return cudf::detail::empty_output_for_rolling_aggregation(input, agg); } if (cudf::is_dictionary(input.type())) { CUDF_EXPECTS(agg.kind == aggregation::COUNT_ALL || agg.kind == aggregation::COUNT_VALID || diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 23b92250549..ddeea40df77 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -280,13 +280,13 @@ ConfigureTest(STREAM_COMPACTION_TEST ################################################################################################### # - rolling tests --------------------------------------------------------------------------------- ConfigureTest(ROLLING_TEST - rolling/rolling_test.cpp + rolling/collect_ops_test.cpp + rolling/empty_input_test.cpp rolling/grouped_rolling_test.cpp rolling/lead_lag_test.cpp - rolling/range_window_bounds_test.cpp rolling/range_rolling_window_test.cpp - rolling/collect_ops_test.cpp - ) + rolling/range_window_bounds_test.cpp + rolling/rolling_test.cpp) ################################################################################################### # - filling test ---------------------------------------------------------------------------------- diff --git a/cpp/tests/rolling/empty_input_test.cpp b/cpp/tests/rolling/empty_input_test.cpp new file mode 100644 index 00000000000..3296f9d32f9 --- /dev/null +++ b/cpp/tests/rolling/empty_input_test.cpp @@ -0,0 +1,402 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace { +// Helper functions to construct rolling window operators. +auto count_valid() +{ + return cudf::make_count_aggregation(cudf::null_policy::EXCLUDE); +} + +auto count_all() +{ + return cudf::make_count_aggregation(cudf::null_policy::INCLUDE); +} + +auto sum() { return cudf::make_sum_aggregation(); } + +auto mean() { return cudf::make_mean_aggregation(); } + +auto min() { return cudf::make_min_aggregation(); } + +auto max() { return cudf::make_max_aggregation(); } + +auto lead() { return cudf::make_lead_aggregation(3); } + +auto lag() { return cudf::make_lag_aggregation(3); } + +auto row_number() { return cudf::make_row_number_aggregation(); } + +auto collect_list() { return cudf::make_collect_list_aggregation(); } + +auto udf() +{ + return cudf::make_udf_aggregation( + cudf::udf_type::CUDA, "", cudf::data_type{cudf::type_id::INT32}); +} + +// Constants for rolling_window. +auto const min_periods = 1; +auto const preceding = 2; +auto const following = 2; +auto const preceding_scalar = cudf::numeric_scalar(preceding); +auto const following_scalar = cudf::numeric_scalar(following); +auto const preceding_column = cudf::test::fixed_width_column_wrapper{}.release(); +auto const following_column = cudf::test::fixed_width_column_wrapper{}.release(); +auto const preceding_col = preceding_column -> view(); +auto const following_col = following_column -> view(); +} // namespace + +struct RollingEmptyInputTest : cudf::test::BaseFixture { +}; + +template +struct TypedRollingEmptyInputTest : RollingEmptyInputTest { +}; + +TYPED_TEST_CASE(TypedRollingEmptyInputTest, cudf::test::FixedWidthTypes); + +using cudf::rolling_aggregation; +using agg_vector_t = std::vector>; + +void rolling_output_type_matches(cudf::column_view const& result, + cudf::type_id expected_type, + cudf::type_id expected_child_type) +{ + using namespace cudf; + using namespace cudf::test; + + EXPECT_EQ(result.type().id(), expected_type); + EXPECT_EQ(result.size(), 0); + if (expected_type == cudf::type_id::LIST) { + EXPECT_EQ(result.child(cudf::lists_column_view::child_column_index).type().id(), + expected_child_type); + } + if (expected_type == cudf::type_id::STRUCT) { + EXPECT_EQ(result.child(0).type().id(), expected_child_type); + } +} + +void rolling_output_type_matches(cudf::column_view const& empty_input, + agg_vector_t const& aggs, + cudf::type_id expected_type, + cudf::type_id expected_child_type = cudf::type_id::EMPTY) +{ + using namespace cudf; + using namespace cudf::test; + + for (auto const& agg : aggs) { + auto rolling_output_numeric_bounds = + rolling_window(empty_input, preceding, following, min_periods, *agg); + rolling_output_type_matches( + rolling_output_numeric_bounds->view(), expected_type, expected_child_type); + + auto rolling_output_columnar_bounds = + rolling_window(empty_input, preceding_col, following_col, min_periods, *agg); + rolling_output_type_matches( + rolling_output_columnar_bounds->view(), expected_type, expected_child_type); + + auto grouped_rolling_output = grouped_rolling_window( + table_view{std::vector{empty_input}}, empty_input, preceding, following, min_periods, *agg); + rolling_output_type_matches(grouped_rolling_output->view(), expected_type, expected_child_type); + + auto grouped_range_rolling_output = + grouped_range_rolling_window(table_view{std::vector{empty_input}}, + empty_input, + order::ASCENDING, + empty_input, + range_window_bounds::get(preceding_scalar), + range_window_bounds::get(following_scalar), + min_periods, + *agg); + rolling_output_type_matches( + grouped_range_rolling_output->view(), expected_type, expected_child_type); + } +} + +void rolling_window_throws(cudf::column_view const& empty_input, agg_vector_t const& aggs) +{ + for (auto const& agg : aggs) { + EXPECT_THROW(rolling_window(empty_input, 2, 2, 1, *agg), cudf::logic_error); + } +} + +TYPED_TEST(TypedRollingEmptyInputTest, EmptyFixedWidthInputs) +{ + using InputType = TypeParam; + using namespace cudf; + using namespace cudf::test; + + auto input_col = fixed_width_column_wrapper{}.release(); + auto empty_input = input_col->view(); + + /// Test aggregations that yield columns of type `size_type`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(count_valid()); + aggs.emplace_back(count_all()); + aggs.emplace_back(row_number()); + + rolling_output_type_matches(empty_input, aggs, type_to_id()); + } + + /// Test aggregations that yield columns of same type as input. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(min()); + aggs.emplace_back(max()); + aggs.emplace_back(lead()); + aggs.emplace_back(lag()); + aggs.emplace_back(udf()); + + rolling_output_type_matches(empty_input, aggs, type_to_id()); + } + + /// `SUM` returns 64-bit promoted types for integral/decimal input. + /// For other fixed-width input types, the same type is returned. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(sum()); + + using expected_type = cudf::detail::target_type_t; + rolling_output_type_matches(empty_input, aggs, type_to_id()); + } + + /// `MEAN` returns float64 for all numeric types, + /// except for chrono-types, which yield the same chrono-type. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(mean()); + + using expected_type = cudf::detail::target_type_t; + rolling_output_type_matches(empty_input, aggs, type_to_id()); + } + + /// For an input type `T`, `COLLECT_LIST` returns a column of type `list`. + { + auto aggs = std::vector>{}; + aggs.emplace_back(collect_list()); + + rolling_output_type_matches( + empty_input, aggs, type_to_id(), type_to_id()); + } +} + +TEST_F(RollingEmptyInputTest, Strings) +{ + using namespace cudf; + using namespace cudf::test; + + auto input_col = strings_column_wrapper{}.release(); + auto empty_input = input_col->view(); + + /// Test aggregations that yield columns of type `size_type`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(count_valid()); + aggs.emplace_back(count_all()); + aggs.emplace_back(row_number()); + + rolling_output_type_matches(empty_input, aggs, type_to_id()); + } + + /// Test aggregations that yield columns of same type as input. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(min()); + aggs.emplace_back(max()); + aggs.emplace_back(lead()); + aggs.emplace_back(lag()); + aggs.emplace_back(udf()); + + rolling_output_type_matches(empty_input, aggs, type_id::STRING); + } + + /// For an input type `T`, `COLLECT_LIST` returns a column of type `list`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(collect_list()); + + rolling_output_type_matches(empty_input, aggs, type_to_id(), type_id::STRING); + } + + /// All other aggregations are unsupported. + { + auto unsupported_aggs = agg_vector_t{}; + unsupported_aggs.emplace_back(sum()); + unsupported_aggs.emplace_back(mean()); + + rolling_window_throws(empty_input, unsupported_aggs); + } +} + +TEST_F(RollingEmptyInputTest, Dictionaries) +{ + using namespace cudf; + using namespace cudf::test; + + auto input_col = dictionary_column_wrapper{}.release(); + auto empty_input = input_col->view(); + + /// Test aggregations that yield columns of type `size_type`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(count_valid()); + aggs.emplace_back(count_all()); + aggs.emplace_back(row_number()); + + rolling_output_type_matches(empty_input, aggs, type_to_id()); + } + + /// Test aggregations that yield columns of same type as input. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(min()); + aggs.emplace_back(max()); + aggs.emplace_back(lead()); + aggs.emplace_back(lag()); + aggs.emplace_back(udf()); + + rolling_output_type_matches(empty_input, aggs, type_id::DICTIONARY32); + } + + /// For an input type `T`, `COLLECT_LIST` returns a column of type `list`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(collect_list()); + + rolling_output_type_matches(empty_input, aggs, type_to_id(), type_id::DICTIONARY32); + } + + /// All other aggregations are unsupported. + { + auto unsupported_aggs = agg_vector_t{}; + unsupported_aggs.emplace_back(sum()); + unsupported_aggs.emplace_back(mean()); + + rolling_window_throws(empty_input, unsupported_aggs); + } +} + +TYPED_TEST(TypedRollingEmptyInputTest, Lists) +{ + using T = TypeParam; + using namespace cudf; + using namespace cudf::test; + + auto input_col = lists_column_wrapper{}.release(); + auto empty_input = input_col->view(); + + /// Test aggregations that yield columns of type `size_type`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(count_valid()); + aggs.emplace_back(count_all()); + aggs.emplace_back(row_number()); + + rolling_output_type_matches(empty_input, aggs, type_to_id()); + } + + /// Test aggregations that yield columns of same type as input. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(min()); + aggs.emplace_back(max()); + aggs.emplace_back(lead()); + aggs.emplace_back(lag()); + aggs.emplace_back(udf()); + + rolling_output_type_matches(empty_input, aggs, type_id::LIST, type_to_id()); + } + + /// For an input type `T`, `COLLECT_LIST` returns a column of type `list`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(collect_list()); + + rolling_output_type_matches(empty_input, aggs, type_id::LIST, type_id::LIST); + } + + /// All other aggregations are unsupported. + { + auto unsupported_aggs = agg_vector_t{}; + unsupported_aggs.emplace_back(sum()); + unsupported_aggs.emplace_back(mean()); + + rolling_window_throws(empty_input, unsupported_aggs); + } +} + +TYPED_TEST(TypedRollingEmptyInputTest, Structs) +{ + using T = TypeParam; + using namespace cudf; + using namespace cudf::test; + + auto member_col = fixed_width_column_wrapper{}; + auto input_col = structs_column_wrapper{{member_col}}.release(); + auto empty_input = input_col->view(); + + /// Test aggregations that yield columns of type `size_type`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(count_valid()); + aggs.emplace_back(count_all()); + aggs.emplace_back(row_number()); + + rolling_output_type_matches(empty_input, aggs, type_to_id()); + } + + /// Test aggregations that yield columns of same type as input. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(min()); + aggs.emplace_back(max()); + aggs.emplace_back(lead()); + aggs.emplace_back(lag()); + aggs.emplace_back(udf()); + + rolling_output_type_matches(empty_input, aggs, type_id::STRUCT, type_to_id()); + } + + /// For an input type `T`, `COLLECT_LIST` returns a column of type `list`. + { + auto aggs = agg_vector_t{}; + aggs.emplace_back(collect_list()); + + rolling_output_type_matches(empty_input, aggs, type_id::LIST, type_id::STRUCT); + } + + /// All other aggregations are unsupported. + { + auto unsupported_aggs = agg_vector_t{}; + unsupported_aggs.emplace_back(sum()); + unsupported_aggs.emplace_back(mean()); + + rolling_window_throws(empty_input, unsupported_aggs); + } +}