From a584cdc5a872f8064d4bd7caa1322671dad52fc7 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Mon, 7 Mar 2022 14:44:58 -0700 Subject: [PATCH] Support `min` and `max` operations for structs in rolling window (#10332) This PR adds support for `min` and `max` operations in rolling window for STRUCT type. It also does some minor modifications to the existing code, such as renaming some variables and refining some comments. Partially addresses #8974. Authors: - Nghia Truong (https://github.com/ttnghia) Approvers: - Jake Hemstad (https://github.com/jrhemstad) - David Wendt (https://github.com/davidwendt) URL: https://github.com/rapidsai/cudf/pull/10332 --- cpp/src/reductions/struct_minmax_util.cuh | 10 +- cpp/src/rolling/rolling_detail.cuh | 296 ++++++++++++++-------- cpp/tests/rolling/rolling_test.cpp | 257 ++++++++++++++++++- 3 files changed, 440 insertions(+), 123 deletions(-) diff --git a/cpp/src/reductions/struct_minmax_util.cuh b/cpp/src/reductions/struct_minmax_util.cuh index 1de48ef482d..b0f2d50b0f5 100644 --- a/cpp/src/reductions/struct_minmax_util.cuh +++ b/cpp/src/reductions/struct_minmax_util.cuh @@ -35,15 +35,15 @@ namespace detail { struct row_arg_minmax_fn { size_type const num_rows; row_lexicographic_comparator const comp; - bool const arg_min; + bool const is_arg_min; row_arg_minmax_fn(table_device_view const& table, bool has_nulls, null_order const* null_precedence, - bool const arg_min) + bool const is_arg_min) : num_rows(table.num_rows()), comp(nullate::DYNAMIC{has_nulls}, table, table, nullptr, null_precedence), - arg_min(arg_min) + is_arg_min(is_arg_min) { } @@ -53,7 +53,7 @@ struct row_arg_minmax_fn { // `thrust::reduce_by_key` or `thrust::scan_by_key` will result in significant compile time. __attribute__((noinline)) __device__ auto operator()(size_type lhs_idx, size_type rhs_idx) const { - // The extra bounds checking is due to issue github.com/rapidsai/cudf/9156 and + // The extra bounds checking is due to issue github.com/rapidsai/cudf/issues/9156 and // github.com/NVIDIA/thrust/issues/1525 // where invalid random values may be passed here by thrust::reduce_by_key if (lhs_idx < 0 || lhs_idx >= num_rows) { return rhs_idx; } @@ -62,7 +62,7 @@ struct row_arg_minmax_fn { // Return `lhs_idx` iff: // row(lhs_idx) < row(rhs_idx) and finding ArgMin, or // row(lhs_idx) >= row(rhs_idx) and finding ArgMax. - return comp(lhs_idx, rhs_idx) == arg_min ? lhs_idx : rhs_idx; + return comp(lhs_idx, rhs_idx) == is_arg_min ? lhs_idx : rhs_idx; } }; diff --git a/cpp/src/rolling/rolling_detail.cuh b/cpp/src/rolling/rolling_detail.cuh index 958da04e57c..a121e247258 100644 --- a/cpp/src/rolling/rolling_detail.cuh +++ b/cpp/src/rolling/rolling_detail.cuh @@ -22,6 +22,8 @@ #include "rolling/rolling_jit_detail.hpp" #include "rolling_detail.hpp" +#include + #include #include #include @@ -52,8 +54,10 @@ #include #include +#include #include #include +#include #include @@ -140,23 +144,35 @@ struct DeviceRolling { }; /** - * @brief Operator for applying an ARGMAX/ARGMIN rolling aggregation on a single window. + * @brief The base struct used for checking if the combination of input type and aggregation op is + * supported. */ template -struct DeviceRollingArgMinMax { +struct DeviceRollingArgMinMaxBase { size_type min_periods; + DeviceRollingArgMinMaxBase(size_type _min_periods) : min_periods(_min_periods) {} - // what operations do we support - template static constexpr bool is_supported() { - // strictly speaking, I think it would be ok to make this work - // for comparable types as well. but right now the only use case is - // for MIN/MAX on strings. - return std::is_same_v; + // Right now only support ARGMIN/ARGMAX of strings and structs. + auto const type_supported = + std::is_same_v || std::is_same_v; + auto const op_supported = op == aggregation::Kind::ARGMIN || op == aggregation::Kind::ARGMAX; + + return type_supported && op_supported; } +}; - DeviceRollingArgMinMax(size_type _min_periods) : min_periods(_min_periods) {} +/** + * @brief Operator for applying an ARGMAX/ARGMIN rolling aggregation on a single window for string. + */ +template +struct DeviceRollingArgMinMaxString : DeviceRollingArgMinMaxBase { + DeviceRollingArgMinMaxString(size_type _min_periods) + : DeviceRollingArgMinMaxBase(_min_periods) + { + } + using DeviceRollingArgMinMaxBase::min_periods; template bool __device__ operator()(column_device_view const& input, @@ -166,14 +182,17 @@ struct DeviceRollingArgMinMax { size_type end_index, size_type current_index) { - using AggOp = typename corresponding_operator::type; + auto constexpr default_output = (op == aggregation::ARGMIN) ? ARGMIN_SENTINEL : ARGMAX_SENTINEL; + + using InputType = cudf::string_view; + using AggOp = typename corresponding_operator::type; AggOp agg_op; // declare this as volatile to avoid some compiler optimizations that lead to incorrect results // for CUDA 10.0 and below (fixed in CUDA 10.1) volatile cudf::size_type count = 0; InputType val = AggOp::template identity(); - OutputType val_index = (op == aggregation::ARGMIN) ? ARGMIN_SENTINEL : ARGMAX_SENTINEL; + OutputType val_index = default_output; for (size_type j = start_index; j < end_index; j++) { if (!has_nulls || input.is_valid(j)) { @@ -185,9 +204,9 @@ struct DeviceRollingArgMinMax { } bool output_is_valid = (count >= min_periods); - // -1 will help identify null elements while gathering for Min and Max - // In case of count, this would be null, so doesn't matter. - output.element(current_index) = (output_is_valid) ? val_index : -1; + // Use the sentinel value (i.e., -1) for the output will help identify null elements while + // gathering for Min and Max. + output.element(current_index) = output_is_valid ? val_index : default_output; // The gather mask shouldn't contain null values, so // always return zero @@ -195,6 +214,50 @@ struct DeviceRollingArgMinMax { } }; +/** + * @brief Operator for applying an ARGMAX/ARGMIN rolling aggregation on a single window for struct. + */ +template +struct DeviceRollingArgMinMaxStruct : DeviceRollingArgMinMaxBase { + DeviceRollingArgMinMaxStruct(size_type _min_periods, Comparator const& _comp) + : DeviceRollingArgMinMaxBase(_min_periods), comp(_comp) + { + } + using DeviceRollingArgMinMaxBase::min_periods; + Comparator comp; + + template + bool __device__ operator()(column_device_view const& input, + column_device_view const&, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) + { + auto constexpr default_output = (op == aggregation::ARGMIN) ? ARGMIN_SENTINEL : ARGMAX_SENTINEL; + + auto const valid_count = + has_nulls ? thrust::count_if(thrust::seq, + thrust::make_counting_iterator(start_index), + thrust::make_counting_iterator(end_index), + [&input](size_type idx) { return input.is_valid_nocheck(idx); }) + : end_index - start_index; + + // Use the sentinel value (i.e., -1) for the output will help identify null elements while + // gathering for Min and Max. + output.element(current_index) = + (valid_count >= min_periods) ? thrust::reduce(thrust::seq, + thrust::make_counting_iterator(start_index), + thrust::make_counting_iterator(end_index), + size_type{start_index}, + comp) + : default_output; + + // The gather mask shouldn't contain null values, so always return true. + return true; + } +}; + /** * @brief Operator for applying a COUNT_VALID rolling aggregation on a single window. */ @@ -219,8 +282,8 @@ struct DeviceRollingCountValid { size_type end_index, size_type current_index) { - // declare this as volatile to avoid some compiler optimizations that lead to incorrect results - // for CUDA 10.0 and below (fixed in CUDA 10.1) + // declare this as volatile to avoid some compiler optimizations that lead to incorrect + // results for CUDA 10.0 and below (fixed in CUDA 10.1) volatile cudf::size_type count = 0; bool output_is_valid = ((end_index - start_index) >= min_periods); @@ -553,12 +616,12 @@ struct corresponding_rolling_operator { template struct corresponding_rolling_operator { - using type = DeviceRollingArgMinMax; + using type = DeviceRollingArgMinMaxBase; }; template struct corresponding_rolling_operator { - using type = DeviceRollingArgMinMax; + using type = DeviceRollingArgMinMaxBase; }; template @@ -577,13 +640,13 @@ struct corresponding_rolling_operator { }; template -struct corresponding_rolling_operator { - using type = DeviceRollingLead; +struct corresponding_rolling_operator { + using type = DeviceRollingVariance; }; template -struct corresponding_rolling_operator { - using type = DeviceRollingVariance; +struct corresponding_rolling_operator { + using type = DeviceRollingLead; }; template @@ -594,49 +657,34 @@ struct corresponding_rolling_operator { /** * @brief Functor for creating a device rolling operator based on input type and aggregation type. */ -template +template struct create_rolling_operator { - auto operator()(size_type min_periods, rolling_aggregation const& agg) - { - CUDF_FAIL("Invalid aggregation/type pair"); - } -}; - -template -struct create_rolling_operator< - InputType, - op, - std::enable_if_t::type::is_supported()>> { - template * = nullptr> auto operator()(size_type min_periods, rolling_aggregation const&) { - return typename corresponding_rolling_operator::type(min_periods); + return typename corresponding_rolling_operator::type(min_periods); } +}; - template * = nullptr> +template +struct create_rolling_operator { auto operator()(size_type min_periods, rolling_aggregation const& agg) { return DeviceRollingVariance{ min_periods, dynamic_cast(agg)._ddof}; } +}; - template * = nullptr> +template +struct create_rolling_operator { auto operator()(size_type, rolling_aggregation const& agg) { return DeviceRollingLead{ dynamic_cast(agg).row_offset}; } +}; - template * = nullptr> +template +struct create_rolling_operator { auto operator()(size_type, rolling_aggregation const& agg) { return DeviceRollingLag{ @@ -644,6 +692,31 @@ struct create_rolling_operator< } }; +template +struct create_rolling_operator< + InputType, + k, + typename std::enable_if_t && + (k == aggregation::Kind::ARGMIN || k == aggregation::Kind::ARGMAX)>> { + auto operator()(size_type min_periods, rolling_aggregation const&) + { + return DeviceRollingArgMinMaxString{min_periods}; + } +}; + +template +struct create_rolling_operator< + InputType, + k, + typename std::enable_if_t && + (k == aggregation::Kind::ARGMIN || k == aggregation::Kind::ARGMAX)>> { + template + auto operator()(size_type min_periods, Comparator const& comp) + { + return DeviceRollingArgMinMaxStruct{min_periods, comp}; + } +}; + /** * @brief Rolling window specific implementation of simple_aggregations_collector. * @@ -652,7 +725,7 @@ struct create_rolling_operator< * happens, the equivalent aggregation/type implementation of finalize() will perform * some postprocessing step. * - * An example of this would be applying a MIN aggregation to strings. This cannot be done + * An example of this would be applying a MIN aggregation to strings. This cannot be done * directly in the rolling operation, so instead the following happens: * * - the rolling_aggregation_preprocessor transforms the incoming MIN/string pair to @@ -662,8 +735,8 @@ struct create_rolling_operator< * - The rolling_aggregation_postprocessor then takes this gather map and performs a final * gather() on the input string data to generate the final output. * - * Another example is COLLECT_LIST. COLLECT_LIST is odd in that it doesn't go through the - * normal gpu rolling kernel at all. It has a completely custom implementation. So the + * Another example is COLLECT_LIST. COLLECT_LIST is odd in that it doesn't go through the + * normal gpu rolling kernel at all. It has a completely custom implementation. So the * following happens: * * - the rolling_aggregation_preprocessor transforms the COLLECT_LIST aggregation into nothing, @@ -687,8 +760,9 @@ class rolling_aggregation_preprocessor final : public cudf::detail::simple_aggre cudf::detail::min_aggregation const&) override { std::vector> aggs; - aggs.push_back(col_type.id() == type_id::STRING ? make_argmin_aggregation() - : make_min_aggregation()); + aggs.push_back(col_type.id() == type_id::STRING || col_type.id() == type_id::STRUCT + ? make_argmin_aggregation() + : make_min_aggregation()); return aggs; } @@ -700,8 +774,9 @@ class rolling_aggregation_preprocessor final : public cudf::detail::simple_aggre cudf::detail::max_aggregation const&) override { std::vector> aggs; - aggs.push_back(col_type.id() == type_id::STRING ? make_argmax_aggregation() - : make_max_aggregation()); + aggs.push_back(col_type.id() == type_id::STRING || col_type.id() == type_id::STRUCT + ? make_argmax_aggregation() + : make_max_aggregation()); return aggs; } @@ -787,7 +862,7 @@ class rolling_aggregation_postprocessor final : public cudf::detail::aggregation // perform a final gather on the generated ARGMIN data void visit(cudf::detail::min_aggregation const&) override { - if (result_type.id() == type_id::STRING) { + if (result_type.id() == type_id::STRING || result_type.id() == type_id::STRUCT) { // The rows that represent null elements will have negative values in gather map, // and that's why nullify_out_of_bounds/ignore_out_of_bounds is true. auto output_table = detail::gather(table_view{{input}}, @@ -805,7 +880,7 @@ class rolling_aggregation_postprocessor final : public cudf::detail::aggregation // perform a final gather on the generated ARGMAX data void visit(cudf::detail::max_aggregation const&) override { - if (result_type.id() == type_id::STRING) { + if (result_type.id() == type_id::STRING || result_type.id() == type_id::STRUCT) { // The rows that represent null elements will have negative values in gather map, // and that's why nullify_out_of_bounds/ignore_out_of_bounds is true. auto output_table = detail::gather(table_view{{input}}, @@ -901,29 +976,24 @@ class rolling_aggregation_postprocessor final : public cudf::detail::aggregation /** * @brief Computes the rolling window function * - * @tparam InputType Datatype of `input` - * @tparam OutputType Datatype of `output` - * @tparam op The aggregation operator (enum value) + * @tparam OutputType Datatype of `output` * @tparam block_size CUDA block size for the kernel * @tparam has_nulls true if the input column has nulls * @tparam DeviceRollingOperator An operator that performs a single windowing operation * @tparam PrecedingWindowIterator iterator type (inferred) * @tparam FollowingWindowIterator iterator type (inferred) - * @param input Input column device view - * @param default_outputs A column of per-row default values to be returned instead - * of nulls for certain aggregation types. - * @param output Output column device view - * @param output_valid_count Output count of valid values - * @param device_operator The operator used to perform a single window operation + * @param[in] input Input column device view + * @param[in] default_outputs A column of per-row default values to be returned instead + * of nulls for certain aggregation types. + * @param[out] output Output column device view + * @param[out] output_valid_count Output count of valid values + * @param[in] device_operator The operator used to perform a single window operation * @param[in] preceding_window_begin Rolling window size iterator, accumulates from - * in_col[i-preceding_window] to in_col[i] inclusive + * in_col[i-preceding_window] to in_col[i] inclusive * @param[in] following_window_begin Rolling window size iterator in the forward - * direction, accumulates from in_col[i] to - * in_col[i+following_window] inclusive + * direction, accumulates from in_col[i] to in_col[i+following_window] inclusive */ -template {}(min_periods, agg); - - auto output = - make_fixed_width_column(output_type, input.size(), mask_state::UNINITIALIZED, stream, mr); - - cudf::mutable_column_view output_view = output->mutable_view(); - - size_type valid_count{0}; - { - using Type = device_storage_type_t; - using OutType = device_storage_type_t>; + auto const do_rolling = [&](auto const& device_op) { + auto output = make_fixed_width_column( + target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr); - constexpr cudf::size_type block_size = 256; - cudf::detail::grid_1d grid(input.size(), block_size); + auto const d_inp_ptr = column_device_view::create(input, stream); + auto const d_default_out_ptr = column_device_view::create(default_outputs, stream); + auto const d_out_ptr = mutable_column_device_view::create(output->mutable_view(), stream); + auto d_valid_count = rmm::device_scalar{0, stream}; - auto input_device_view = column_device_view::create(input, stream); - auto output_device_view = mutable_column_device_view::create(output_view, stream); - auto default_outputs_device_view = column_device_view::create(default_outputs, stream); - - rmm::device_scalar device_valid_count{0, stream}; + auto constexpr block_size = 256; + auto const grid = cudf::detail::grid_1d(input.size(), block_size); + using OutType = device_storage_type_t>; if (input.has_nulls()) { - gpu_rolling - <<>>(*input_device_view, - *default_outputs_device_view, - *output_device_view, - device_valid_count.data(), - device_operator, + gpu_rolling + <<>>(*d_inp_ptr, + *d_default_out_ptr, + *d_out_ptr, + d_valid_count.data(), + device_op, preceding_window_begin, following_window_begin); } else { - gpu_rolling - <<>>(*input_device_view, - *default_outputs_device_view, - *output_device_view, - device_valid_count.data(), - device_operator, + gpu_rolling + <<>>(*d_inp_ptr, + *d_default_out_ptr, + *d_out_ptr, + d_valid_count.data(), + device_op, preceding_window_begin, following_window_begin); } - valid_count = device_valid_count.value(stream); - - // check the stream for debugging - CHECK_CUDA(stream.value()); + auto const valid_count = d_valid_count.value(stream); + output->set_null_count(output->size() - valid_count); + + return output; + }; // end do_rolling + + auto constexpr is_arg_minmax = + op == aggregation::Kind::ARGMIN || op == aggregation::Kind::ARGMAX; + + if constexpr (is_arg_minmax && std::is_same_v) { + // Using comp_generator to create a LESS operator for finding ARGMIN/ARGMAX of structs. + auto const comp_generator = + cudf::reduction::detail::comparison_binop_generator::create(input, stream); + auto const device_op = + create_rolling_operator{}(min_periods, comp_generator.binop()); + return do_rolling(device_op); + } else { // all the remaining rolling operations + auto const device_op = create_rolling_operator{}(min_periods, agg); + return do_rolling(device_op); } - - output->set_null_count(output->size() - valid_count); - - return output; } template #include #include +#include #include #include -#include #include #include #include #include -#include #include #include #include @@ -167,6 +166,252 @@ TEST_F(RollingStringTest, ZeroWindowSize) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count, got_count->view()); } +// ========================================================================================= +class RollingStructTest : public cudf::test::BaseFixture { +}; + +TEST_F(RollingStructTest, NoNullStructsMinMaxCount) +{ + using namespace cudf::test::iterators; + using strings_col = cudf::test::strings_column_wrapper; + using ints_col = cudf::test::fixed_width_column_wrapper; + using structs_col = cudf::test::structs_column_wrapper; + + auto const do_test = [](auto const& input) { + auto const expected_min = [] { + auto child1 = strings_col{ + "This", "This", "being", "being", "being", "being", "column", "column", "column"}; + auto child2 = ints_col{1, 1, 5, 5, 5, 5, 9, 9, 9}; + return structs_col{{child1, child2}, no_nulls()}; + }(); + + auto const expected_max = [] { + auto child1 = strings_col{ + "rolling", "test", "test", "test", "test", "string", "string", "string", "string"}; + auto child2 = ints_col{3, 4, 4, 4, 4, 8, 8, 8, 8}; + return structs_col{{child1, child2}, no_nulls()}; + }(); + + auto const expected_count = ints_col{{3, 4, 4, 4, 4, 4, 4, 3, 2}, no_nulls()}; + auto constexpr preceeding = 2; + auto constexpr following = 2; + auto constexpr min_period = 1; + + auto const result_min = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_min_aggregation()); + auto const result_max = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_max_aggregation()); + auto const result_count_valid = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_count_aggregation()); + auto const result_count_all = cudf::rolling_window( + input, + preceeding, + following, + min_period, + *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, result_min->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, result_max->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count, result_count_valid->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count, result_count_all->view()); + }; + + auto const input_no_sliced = [] { + auto child1 = + strings_col{"This", "is", "rolling", "test", "being", "operated", "on", "string", "column"}; + auto child2 = ints_col{1, 2, 3, 4, 5, 6, 7, 8, 9}; + return structs_col{{child1, child2}}; + }(); + + auto const input_before_sliced = [] { + auto constexpr dont_care{0}; + auto child1 = strings_col{"1dont_care", + "1dont_care", + "@dont_care", + "This", + "is", + "rolling", + "test", + "being", + "operated", + "on", + "string", + "column", + "1dont_care", + "1dont_care", + "@dont_care"}; + auto child2 = ints_col{ + dont_care, dont_care, dont_care, 1, 2, 3, 4, 5, 6, 7, 8, 9, dont_care, dont_care, dont_care}; + return structs_col{{child1, child2}}; + }(); + auto const input_sliced = cudf::slice(input_before_sliced, {3, 12})[0]; + + do_test(input_no_sliced); + do_test(input_sliced); +} + +TEST_F(RollingStructTest, NullChildrenMinMaxCount) +{ + using namespace cudf::test::iterators; + using strings_col = cudf::test::strings_column_wrapper; + using ints_col = cudf::test::fixed_width_column_wrapper; + using structs_col = cudf::test::structs_column_wrapper; + + auto const input = [] { + auto child1 = strings_col{ + {"This", "" /*NULL*/, "" /*NULL*/, "test", "" /*NULL*/, "operated", "on", "string", "column"}, + nulls_at({1, 2, 4})}; + auto child2 = ints_col{1, 2, 3, 4, 5, 6, 7, 8, 9}; + return structs_col{{child1, child2}}; + }(); + + auto const expected_min = [] { + auto child1 = strings_col{{"" /*NULL*/, + "" /*NULL*/, + "" /*NULL*/, + "" /*NULL*/, + "" /*NULL*/, + "" /*NULL*/, + "column", + "column", + "column"}, + nulls_at({0, 1, 2, 3, 4, 5})}; + auto child2 = ints_col{2, 2, 2, 3, 5, 5, 9, 9, 9}; + return structs_col{{child1, child2}, no_nulls()}; + }(); + + auto const expected_max = [] { + auto child1 = + strings_col{"This", "test", "test", "test", "test", "string", "string", "string", "string"}; + auto child2 = ints_col{1, 4, 4, 4, 4, 8, 8, 8, 8}; + return structs_col{{child1, child2}, no_nulls()}; + }(); + + auto const expected_count = ints_col{{3, 4, 4, 4, 4, 4, 4, 3, 2}, no_nulls()}; + auto constexpr preceeding = 2; + auto constexpr following = 2; + auto constexpr min_period = 1; + + auto const result_min = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_min_aggregation()); + + auto const result_max = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_max_aggregation()); + + auto const result_count_valid = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_count_aggregation()); + auto const result_count_all = cudf::rolling_window( + input, + preceeding, + following, + min_period, + *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, result_min->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, result_max->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count, result_count_valid->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count, result_count_all->view()); +} + +TEST_F(RollingStructTest, NullParentMinMaxCount) +{ + using namespace cudf::test::iterators; + using strings_col = cudf::test::strings_column_wrapper; + using ints_col = cudf::test::fixed_width_column_wrapper; + using structs_col = cudf::test::structs_column_wrapper; + + auto constexpr null{0}; + auto const input = [] { + auto child1 = strings_col{"This", + "" /*NULL*/, + "" /*NULL*/, + "test", + "" /*NULL*/, + "operated", + "on", + "string", + "" /*NULL*/}; + auto child2 = ints_col{1, null, null, 4, null, 6, 7, 8, null}; + return structs_col{{child1, child2}, nulls_at({1, 2, 4, 8})}; + }(); + + auto const expected_min = [] { + auto child1 = strings_col{"This", "This", "test", "operated", "on", "on", "on", "on", "string"}; + auto child2 = ints_col{1, 1, 4, 6, 7, 7, 7, 7, 8}; + return structs_col{{child1, child2}, no_nulls()}; + }(); + + auto const expected_max = [] { + auto child1 = + strings_col{"This", "test", "test", "test", "test", "string", "string", "string", "string"}; + auto child2 = ints_col{1, 4, 4, 4, 4, 8, 8, 8, 8}; + return structs_col{{child1, child2}, no_nulls()}; + }(); + + auto const expected_count_valid = ints_col{{1, 2, 1, 2, 3, 3, 3, 2, 1}, no_nulls()}; + auto const expected_count_all = ints_col{{3, 4, 4, 4, 4, 4, 4, 3, 2}, no_nulls()}; + auto constexpr preceeding = 2; + auto constexpr following = 2; + auto constexpr min_period = 1; + + auto const result_min = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_min_aggregation()); + + auto const result_max = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_max_aggregation()); + + auto const result_count_valid = + cudf::rolling_window(input, + preceeding, + following, + min_period, + *cudf::make_count_aggregation()); + auto const result_count_all = cudf::rolling_window( + input, + preceeding, + following, + min_period, + *cudf::make_count_aggregation(cudf::null_policy::INCLUDE)); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, result_min->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, result_max->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_valid, result_count_valid->view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_all, result_count_all->view()); +} + +// ========================================================================================= template class RollingTest : public cudf::test::BaseFixture { protected: @@ -1102,10 +1347,10 @@ TEST_F(RollingTestUdf, DynamicWindow) thrust::make_constant_iterator(true)); auto prec = cudf::detail::make_counting_transform_iterator( - 0, [size] __device__(size_type row) { return row % 2 + 2; }); + 0, [] __device__(size_type row) { return row % 2 + 2; }); auto follow = cudf::detail::make_counting_transform_iterator( - 0, [size] __device__(size_type row) { return row % 2; }); + 0, [] __device__(size_type row) { return row % 2; }); fixed_width_column_wrapper preceding(prec, prec + size); fixed_width_column_wrapper following(follow, follow + size); @@ -1118,7 +1363,7 @@ TEST_F(RollingTestUdf, DynamicWindow) }); auto valid = cudf::detail::make_counting_transform_iterator( - 0, [size] __device__(size_type row) { return row != 0; }); + 0, [] __device__(size_type row) { return row != 0; }); fixed_width_column_wrapper expected{start, start + size, valid};