From 14b0900563c8701d834fa4287c55085f05dc20c2 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 29 Jan 2021 16:21:14 -0800 Subject: [PATCH] Implement COLLECT rolling window aggregation (#7189) Closes #7133. This is an implementation of the `COLLECT` aggregation in the context of rolling window functions. This enables the collection of rows (of type `T`) within specified window boundaries into a list column (containing elements of type `T`). In this context, one list row would be generated per input row. E.g. Consider the following example: ```c++ auto input_col = fixed_width_column_wrapper{70, 71, 72, 73, 74}; ``` Calling `rolling_window()` with `preceding=2`, `following=1`, `min_periods=1` produces the following: ```c++ auto output_col = cudf::rolling_window(input_col, 2, 1, 1, collect_aggr); // == [ [70,71], [70,71,72], [71,72,73], [72,73,74], [73,74] ] ``` `COLLECT` is supported with `rolling_window()`, `grouped_rolling_window()`, and `grouped_time_range_rolling_window()`, across primitive types and arbitrarily nested lists and structs. `min_periods` is also honoured: If the number of observations is fewer than min_periods, the resulting list row is null. Authors: - MithunR (@mythrocks) Approvers: - Keith Kraus (@kkraus14) - Vukasin Milovanovic (@vuule) - Ram (Ramakrishna Prabhu) (@rgsl888prabhu) URL: https://github.com/rapidsai/cudf/pull/7189 --- cpp/include/cudf/lists/detail/scatter.cuh | 25 +- cpp/include/cudf/lists/detail/utilities.cuh | 46 ++ cpp/src/rolling/rolling_detail.cuh | 243 ++++++- cpp/src/rolling/rolling_detail.hpp | 20 +- cpp/tests/CMakeLists.txt | 8 + cpp/tests/collect_list/collect_list_test.cpp | 705 +++++++++++++++++++ 6 files changed, 1014 insertions(+), 33 deletions(-) create mode 100644 cpp/include/cudf/lists/detail/utilities.cuh create mode 100644 cpp/tests/collect_list/collect_list_test.cpp diff --git a/cpp/include/cudf/lists/detail/scatter.cuh b/cpp/include/cudf/lists/detail/scatter.cuh index 53f0472fedc..32f6cc6db7a 100644 --- a/cpp/include/cudf/lists/detail/scatter.cuh +++ b/cpp/include/cudf/lists/detail/scatter.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -160,28 +161,6 @@ rmm::device_uvector list_vector_from_column( return vector; } -/** - * @brief Fetch the number of rows in a lists column's child given its offsets column. - * - * @param list_offsets Offsets child of a lists column - * @param stream The cuda-stream to synchronize on, when reading from device memory - * @return cudf::size_type The last element in the list_offsets column, indicating - * the number of rows in the lists-column's child. - */ -cudf::size_type get_num_child_rows(cudf::column_view const& list_offsets, - rmm::cuda_stream_view stream) -{ - // Number of rows in child-column == last offset value. - cudf::size_type num_child_rows{}; - CUDA_TRY(cudaMemcpyAsync(&num_child_rows, - list_offsets.data() + list_offsets.size() - 1, - sizeof(cudf::size_type), - cudaMemcpyDeviceToHost, - stream.value())); - stream.synchronize(); - return num_child_rows; -} - /** * @brief Constructs null mask for a scattered list's child column * diff --git a/cpp/include/cudf/lists/detail/utilities.cuh b/cpp/include/cudf/lists/detail/utilities.cuh new file mode 100644 index 00000000000..ccee9b0d5d9 --- /dev/null +++ b/cpp/include/cudf/lists/detail/utilities.cuh @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace cudf { +namespace detail { + +/** + * @brief Fetch the number of rows in a lists column's child given its offsets column. + * + * @param[in] list_offsets Offsets child of a lists column + * @param[in] stream The cuda-stream to synchronize on, when reading from device memory + * @return cudf::size_type The number of child rows in the lists column + */ +static cudf::size_type get_num_child_rows(cudf::column_view const& list_offsets, + rmm::cuda_stream_view stream) +{ + // Number of rows in child-column == last offset value. + cudf::size_type num_child_rows{}; + CUDA_TRY(cudaMemcpyAsync(&num_child_rows, + list_offsets.data() + list_offsets.size() - 1, + sizeof(cudf::size_type), + cudaMemcpyDeviceToHost, + stream.value())); + stream.synchronize(); + return num_child_rows; +} + +} // namespace detail +} // namespace cudf diff --git a/cpp/src/rolling/rolling_detail.cuh b/cpp/src/rolling/rolling_detail.cuh index 5abef95310b..8a0f5f8002d 100644 --- a/cpp/src/rolling/rolling_detail.cuh +++ b/cpp/src/rolling/rolling_detail.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,9 +33,12 @@ #include #include #include +#include #include #include +#include #include +#include #include #include #include @@ -312,7 +315,7 @@ template ::value and !(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL || op == aggregation::ROW_NUMBER || op == aggregation::LEAD || - op == aggregation::LAG)>* = nullptr> + op == aggregation::LAG || op == aggregation::COLLECT)>* = nullptr> bool __device__ process_rolling_window(column_device_view input, column_device_view ignored_default_outputs, mutable_column_device_view output, @@ -810,7 +813,8 @@ struct rolling_window_launcher { template - std::enable_if_t> operator()(column_view const& input, column_view const& default_outputs, @@ -891,6 +895,239 @@ struct rolling_window_launcher { stream, mr); } + + /** + * @brief Creates the offsets child of the result of the `COLLECT` window aggregation + * + * Given the input column, the preceding/following window bounds, and `min_periods`, + * the sizes of each list row may be computed. These values can then be used to + * calculate the offsets for the result of `COLLECT`. + * + * Note: If `min_periods` exceeds the number of observations for a window, the size + * is set to `0` (since the result is `null`). + */ + template + std::unique_ptr create_collect_offsets(size_type input_size, + PrecedingIter preceding_begin, + FollowingIter following_begin, + size_type min_periods, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + { + // Materialize offsets column. + auto static constexpr size_data_type = data_type{type_to_id()}; + auto sizes = + make_fixed_width_column(size_data_type, input_size, mask_state::UNALLOCATED, stream, mr); + auto mutable_sizes = sizes->mutable_view(); + + // Consider the following preceding/following values: + // preceding = [1,2,2,2,2] + // following = [1,1,1,1,0] + // The sum of the vectors should yield the window sizes: + // prec + foll = [2,3,3,3,2] + // + // If min_periods=2, all rows have at least `min_periods` observations. + // But if min_periods=3, rows at indices 0 and 4 have too few observations, and must return + // null. The sizes at these positions must be 0, i.e. + // prec + foll = [0,3,3,3,0] + thrust::transform(rmm::exec_policy(stream), + preceding_begin, + preceding_begin + input_size, + following_begin, + mutable_sizes.begin(), + [min_periods] __device__(auto preceding, auto following) { + return (preceding + following) < min_periods ? 0 : (preceding + following); + }); + + // Convert `sizes` to an offsets column, via inclusive_scan(): + return strings::detail::make_offsets_child_column( + sizes->view().begin(), sizes->view().end(), stream, mr); + } + + /** + * @brief Generate mapping of each row in the COLLECT result's child column + * to the index of the row it belongs to. + * + * If + * input col == [A,B,C,D,E] + * and preceding == [1,2,2,2,2], + * and following == [1,1,1,1,0], + * then, + * collect result == [ [A,B], [A,B,C], [B,C,D], [C,D,E], [D,E] ] + * i.e. result offset column == [0,2,5,8,11,13], + * and result child column == [A,B,A,B,C,B,C,D,C,D,E,D,E]. + * Mapping back to `input` == [0,1,0,1,2,1,2,3,2,3,4,3,4] + */ + std::unique_ptr get_list_child_to_list_row_mapping(cudf::column_view const& offsets, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + { + auto static constexpr size_data_type = data_type{type_to_id()}; + + // First, reduce offsets column by key, to identify the number of times + // an offset appears. + // Next, scatter the count for each offset (except the first and last), + // into a column of N `0`s, where N == number of child rows. + // For the example above: + // offsets == [0, 2, 5, 8, 11, 13] + // scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0] + // + // If the above example had an empty list row at index 2, + // the same columns would look as follows: + // offsets == [0, 2, 5, 5, 8, 11, 13] + // scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0] + // + // Note: To correctly handle null list rows at the beginning of + // the output column, care must be taken to skip the first `0` + // in the offsets column, when running `reduce_by_key()`. + // This accounts for the `0` added by default to the offsets + // column, marking the beginning of the column. + + auto num_child_rows = get_num_child_rows(offsets, stream); + + auto scatter_values = + make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr); + auto scatter_keys = + make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr); + auto reduced_by_key = + thrust::reduce_by_key(rmm::exec_policy(stream), + offsets.template begin() + 1, // Skip first 0 in offsets. + offsets.template end(), + thrust::make_constant_iterator(1), + scatter_keys->mutable_view().template begin(), + scatter_values->mutable_view().template begin()); + auto scatter_values_end = reduced_by_key.second; + auto scatter_output = + make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr); + thrust::fill_n(rmm::exec_policy(stream), + scatter_output->mutable_view().template begin(), + num_child_rows, + 0); // [0,0,0,...0] + thrust::scatter( + rmm::exec_policy(stream), + scatter_values->mutable_view().template begin(), + scatter_values_end, + scatter_keys->view().template begin(), + scatter_output->mutable_view().template begin()); // [0,0,1,0,0,1,...] + + // Next, generate mapping with inclusive_scan() on scatter() result. + // For the example above: + // scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0] + // inclusive_scan == [0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4] + // + // For the case with an empty list at index 3: + // scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0] + // inclusive_scan == [0, 0, 1, 1, 1, 3, 3, 3, 4, 4, 4, 5, 5] + auto per_row_mapping = + make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr); + thrust::inclusive_scan(rmm::exec_policy(stream), + scatter_output->view().template begin(), + scatter_output->view().template end(), + per_row_mapping->mutable_view().template begin()); + return per_row_mapping; + } + + /** + * @brief Create gather map to generate the child column of the result of + * the `COLLECT` window aggregation. + */ + template + std::unique_ptr create_collect_gather_map(column_view const& child_offsets, + column_view const& per_row_mapping, + PrecedingIter preceding_iter, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + { + auto gather_map = make_fixed_width_column(data_type{type_to_id()}, + per_row_mapping.size(), + mask_state::UNALLOCATED, + stream, + mr); + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(per_row_mapping.size()), + gather_map->mutable_view().template begin(), + [d_offsets = + child_offsets.template begin(), // E.g. [0, 2, 5, 8, 11, 13] + d_groups = + per_row_mapping.template begin(), // E.g. [0,0, 1,1,1, 2,2,2, 3,3,3, 4,4] + d_prev = preceding_iter] __device__(auto i) { + auto group = d_groups[i]; + auto group_start_offset = d_offsets[group]; + auto relative_index = i - group_start_offset; + + return (group - d_prev[group] + 1) + relative_index; + }); + return gather_map; + } + + template + std::enable_if_t<(op == aggregation::COLLECT), std::unique_ptr> operator()( + column_view const& input, + column_view const& default_outputs, + PrecedingIter preceding_begin_raw, + FollowingIter following_begin_raw, + size_type min_periods, + std::unique_ptr const& agg, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) + { + CUDF_EXPECTS(default_outputs.is_empty(), + "COLLECT window function does not support default values."); + + if (input.is_empty()) return empty_like(input); + + // Fix up preceding/following iterators to respect column boundaries, + // similar to gpu_rolling(). + // `rolling_window()` does not fix up preceding/following so as not to read past + // column boundaries. + // `grouped_rolling_window()` and `time_range_based_grouped_rolling_window() do. + auto preceding_begin = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), [preceding_begin_raw] __device__(auto i) { + return thrust::min(preceding_begin_raw[i], i + 1); + }); + auto following_begin = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), + [following_begin_raw, size = input.size()] __device__(auto i) { + return thrust::min(following_begin_raw[i], size - i - 1); + }); + + // Materialize collect list's offsets. + auto offsets = create_collect_offsets( + input.size(), preceding_begin, following_begin, min_periods, stream, mr); + + // Map each element of the collect() result's child column + // to the index where it appears in the input. + auto per_row_mapping = get_list_child_to_list_row_mapping(offsets->view(), stream, mr); + + // Generate gather map to produce the collect() result's child column. + auto gather_map = create_collect_gather_map( + offsets->view(), per_row_mapping->view(), preceding_begin, stream, mr); + + // gather(), to construct child column. + auto gather_output = + cudf::gather(table_view{std::vector{input}}, gather_map->view()); + + rmm::device_buffer null_mask; + size_type null_count; + std::tie(null_mask, null_count) = valid_if( + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.size()), + [preceding_begin, following_begin, min_periods] __device__(auto i) { + return (preceding_begin[i] + following_begin[i]) >= min_periods; + }, + stream, + mr); + + return make_lists_column(input.size(), + std::move(offsets), + std::move(gather_output->release()[0]), + null_count, + std::move(null_mask), + stream, + mr); + } }; struct dispatch_rolling { diff --git a/cpp/src/rolling/rolling_detail.hpp b/cpp/src/rolling/rolling_detail.hpp index 1d1c42190ec..d7fa92f1978 100644 --- a/cpp/src/rolling/rolling_detail.hpp +++ b/cpp/src/rolling/rolling_detail.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ static constexpr bool is_rolling_supported() (op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or (op == aggregation::MEAN) or (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or - (op == aggregation::LAG); + (op == aggregation::LAG) or (op == aggregation::COLLECT); constexpr bool is_valid_numeric_agg = (cudf::is_numeric() or cudf::is_duration() or @@ -53,21 +53,27 @@ static constexpr bool is_rolling_supported() } else if (cudf::is_timestamp()) { return (op == aggregation::MIN) or (op == aggregation::MAX) or (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or (op == aggregation::LAG); + (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or + (op == aggregation::LAG) or (op == aggregation::COLLECT); } else if (cudf::is_fixed_point()) { return (op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or (op == aggregation::LAG); + (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or + (op == aggregation::LAG) or (op == aggregation::COLLECT); } else if (std::is_same()) { return (op == aggregation::MIN) or (op == aggregation::MAX) or (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER); + (op == aggregation::ROW_NUMBER) or (op == aggregation::COLLECT); } else if (std::is_same()) { return (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER); - } else + (op == aggregation::ROW_NUMBER) or (op == aggregation::COLLECT); + } else if (std::is_same()) { + // TODO: Add support for COUNT_VALID, COUNT_ALL, ROW_NUMBER. + return op == aggregation::COLLECT; + } else { return false; + } } // return true if this Op is specialized for strings. diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 28e44f9508a..8395a3cc1f2 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -492,6 +492,14 @@ set(LEAD_LAG_TEST_SRC ConfigureTest(LEAD_LAG_TEST "${LEAD_LAG_TEST_SRC}") +################################################################################################### +# - collect_list rolling tests --------------------------------------------------------------------------------- + +set(COLLECT_LIST_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/collect_list/collect_list_test.cpp") + +ConfigureTest(COLLECT_LIST_TEST "${COLLECT_LIST_SRC}") + ################################################################################################### # - filling test ---------------------------------------------------------------------------------- diff --git a/cpp/tests/collect_list/collect_list_test.cpp b/cpp/tests/collect_list/collect_list_test.cpp new file mode 100644 index 00000000000..8021d7171b3 --- /dev/null +++ b/cpp/tests/collect_list/collect_list_test.cpp @@ -0,0 +1,705 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +struct CollectListTest : public cudf::test::BaseFixture { +}; + +template +struct TypedCollectListTest : public CollectListTest { +}; + +using TypesForTest = cudf::test:: + Concat; + +TYPED_TEST_CASE(TypedCollectListTest, TypesForTest); + +TYPED_TEST(TypedCollectListTest, BasicRollingWindow) +{ + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const input_column = fixed_width_column_wrapper{10, 11, 12, 13, 14}; + + auto const prev_column = fixed_width_column_wrapper{1, 2, 2, 2, 2}; + auto const foll_column = fixed_width_column_wrapper{1, 1, 1, 1, 0}; + + EXPECT_EQ(static_cast(prev_column).size(), + static_cast(foll_column).size()); + + auto const result_column_based_window = + rolling_window(input_column, prev_column, foll_column, 1, make_collect_aggregation()); + + auto const expected_result = + lists_column_wrapper{ + {10, 11}, + {10, 11, 12}, + {11, 12, 13}, + {12, 13, 14}, + {13, 14}, + } + .release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result_column_based_window->view()); + + auto const result_fixed_window = + rolling_window(input_column, 2, 1, 1, make_collect_aggregation()); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result_fixed_window->view()); +} + +TYPED_TEST(TypedCollectListTest, RollingWindowWithEmptyOutputLists) +{ + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const input_column = fixed_width_column_wrapper{10, 11, 12, 13, 14, 15}; + + auto const prev_column = fixed_width_column_wrapper{1, 2, 2, 0, 2, 2}; + auto const foll_column = fixed_width_column_wrapper{1, 1, 1, 0, 1, 0}; + + EXPECT_EQ(static_cast(prev_column).size(), + static_cast(foll_column).size()); + + auto const result_column_based_window = + rolling_window(input_column, prev_column, foll_column, 0, make_collect_aggregation()); + + auto const expected_result = + lists_column_wrapper{ + {10, 11}, + {10, 11, 12}, + {11, 12, 13}, + {}, + {13, 14, 15}, + {14, 15}, + } + .release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result_column_based_window->view()); +} + +TYPED_TEST(TypedCollectListTest, RollingWindowWithEmptyOutputListsAtEnds) +{ + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const input_column = fixed_width_column_wrapper{0, 1, 2, 3, 4, 5}; + + auto const prev_column = fixed_width_column_wrapper{0, 2, 2, 2, 2, 0}; + auto foll_column = fixed_width_column_wrapper{0, 1, 1, 1, 1, 0}; + + auto const result = + rolling_window(input_column, prev_column, foll_column, 0, make_collect_aggregation()); + + auto const expected_result = + lists_column_wrapper{{}, {0, 1, 2}, {1, 2, 3}, {2, 3, 4}, {3, 4, 5}, {}}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +TYPED_TEST(TypedCollectListTest, RollingWindowHonoursMinPeriods) +{ + // Test that when the number of observations is fewer than min_periods, + // the result is null. + + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const input_column = fixed_width_column_wrapper{0, 1, 2, 3, 4, 5}; + auto const num_elements = static_cast(input_column).size(); + + auto preceding = 2; + auto following = 1; + auto min_periods = 3; + auto const result = + rolling_window(input_column, preceding, following, min_periods, make_collect_aggregation()); + + auto const expected_result = lists_column_wrapper{ + {{}, {0, 1, 2}, {1, 2, 3}, {2, 3, 4}, {3, 4, 5}, {}}, + make_counting_transform_iterator(0, [num_elements](auto i) { + return i != 0 && i != (num_elements - 1); + })}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); + + preceding = 2; + following = 2; + min_periods = 4; + + auto result_2 = + rolling_window(input_column, preceding, following, min_periods, make_collect_aggregation()); + auto expected_result_2 = lists_column_wrapper{ + {{}, {0, 1, 2, 3}, {1, 2, 3, 4}, {2, 3, 4, 5}, {}, {}}, + make_counting_transform_iterator(0, [num_elements](auto i) { + return i != 0 && i < 4; + })}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result_2->view(), result_2->view()); +} + +TYPED_TEST(TypedCollectListTest, RollingWindowWithNullInputsHonoursMinPeriods) +{ + // Test that when the number of observations is fewer than min_periods, + // the result is null. + // Input column has null inputs. + + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const input_column = + fixed_width_column_wrapper{{0, 1, 2, 3, 4, 5}, {1, 0, 1, 1, 0, 1}}; + // auto const num_elements = static_cast(input_column).size(); + + { + // One result row at each end should be null. + auto preceding = 2; + auto following = 1; + auto min_periods = 3; + auto const result = + rolling_window(input_column, preceding, following, min_periods, make_collect_aggregation()); + + auto expected_result_child_values = std::vector{0, 1, 2, 1, 2, 3, 2, 3, 4, 3, 4, 5}; + auto expected_result_child_validity = std::vector{1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1}; + auto expected_result_child = + fixed_width_column_wrapper(expected_result_child_values.begin(), + expected_result_child_values.end(), + expected_result_child_validity.begin()); + auto expected_offsets = fixed_width_column_wrapper{0, 0, 3, 6, 9, 12, 12}.release(); + auto expected_num_rows = expected_offsets->size() - 1; + auto null_mask_iter = make_counting_transform_iterator( + size_type{0}, [expected_num_rows](auto i) { return i != 0 && i != (expected_num_rows - 1); }); + + auto expected_result = make_lists_column( + expected_num_rows, + std::move(expected_offsets), + expected_result_child.release(), + 2, + cudf::test::detail::make_null_mask(null_mask_iter, null_mask_iter + expected_num_rows)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); + } + + { + // First result row, and the last two result rows should be null. + auto preceding = 2; + auto following = 2; + auto min_periods = 4; + auto const result = + rolling_window(input_column, preceding, following, min_periods, make_collect_aggregation()); + + auto expected_result_child_values = std::vector{0, 1, 2, 3, 1, 2, 3, 4, 2, 3, 4, 5}; + auto expected_result_child_validity = std::vector{1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1}; + auto expected_result_child = + fixed_width_column_wrapper(expected_result_child_values.begin(), + expected_result_child_values.end(), + expected_result_child_validity.begin()); + + auto expected_offsets = fixed_width_column_wrapper{0, 0, 4, 8, 12, 12, 12}.release(); + auto expected_num_rows = expected_offsets->size() - 1; + auto null_mask_iter = make_counting_transform_iterator( + size_type{0}, [expected_num_rows](auto i) { return i > 0 && i < 4; }); + + auto expected_result = make_lists_column( + expected_num_rows, + std::move(expected_offsets), + expected_result_child.release(), + 3, + cudf::test::detail::make_null_mask(null_mask_iter, null_mask_iter + expected_num_rows)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); + } +} + +TEST_F(CollectListTest, RollingWindowHonoursMinPeriodsOnStrings) +{ + // Test that when the number of observations is fewer than min_periods, + // the result is null. + + using namespace cudf; + using namespace cudf::test; + + auto const input_column = strings_column_wrapper{"0", "1", "2", "3", "4", "5"}; + auto const num_elements = static_cast(input_column).size(); + + auto preceding = 2; + auto following = 1; + auto min_periods = 3; + auto const result = + rolling_window(input_column, preceding, following, min_periods, make_collect_aggregation()); + + auto const expected_result = lists_column_wrapper{ + {{}, {"0", "1", "2"}, {"1", "2", "3"}, {"2", "3", "4"}, {"3", "4", "5"}, {}}, + make_counting_transform_iterator(0, [num_elements](auto i) { + return i != 0 && i != (num_elements - 1); + })}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); + + preceding = 2; + following = 2; + min_periods = 4; + + auto result_2 = + rolling_window(input_column, preceding, following, min_periods, make_collect_aggregation()); + auto expected_result_2 = lists_column_wrapper{ + {{}, {"0", "1", "2", "3"}, {"1", "2", "3", "4"}, {"2", "3", "4", "5"}, {}, {}}, + make_counting_transform_iterator(0, [num_elements](auto i) { + return i != 0 && i < 4; + })}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result_2->view(), result_2->view()); +} + +TEST_F(CollectListTest, RollingWindowHonoursMinPeriodsWithDecimal) +{ + // Test that when the number of observations is fewer than min_periods, + // the result is null. + + using namespace cudf; + using namespace cudf::test; + + auto const input_iter = make_counting_transform_iterator(0, thrust::identity{}); + auto const input_column = + fixed_point_column_wrapper{input_iter, input_iter + 6, numeric::scale_type{0}}; + + { + // One result row at each end should be null. + auto preceding = 2; + auto following = 1; + auto min_periods = 3; + auto const result = + rolling_window(input_column, preceding, following, min_periods, make_collect_aggregation()); + + auto expected_result_child_values = std::vector{0, 1, 2, 1, 2, 3, 2, 3, 4, 3, 4, 5}; + auto expected_result_child = + fixed_point_column_wrapper{expected_result_child_values.begin(), + expected_result_child_values.end(), + numeric::scale_type{0}}; + auto expected_offsets = fixed_width_column_wrapper{0, 0, 3, 6, 9, 12, 12}.release(); + auto expected_num_rows = expected_offsets->size() - 1; + auto null_mask_iter = make_counting_transform_iterator( + size_type{0}, [expected_num_rows](auto i) { return i != 0 && i != (expected_num_rows - 1); }); + + auto expected_result = make_lists_column( + expected_num_rows, + std::move(expected_offsets), + expected_result_child.release(), + 2, + cudf::test::detail::make_null_mask(null_mask_iter, null_mask_iter + expected_num_rows)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); + } + + { + // First result row, and the last two result rows should be null. + auto preceding = 2; + auto following = 2; + auto min_periods = 4; + auto const result = + rolling_window(input_column, preceding, following, min_periods, make_collect_aggregation()); + + auto expected_result_child_values = std::vector{0, 1, 2, 3, 1, 2, 3, 4, 2, 3, 4, 5}; + auto expected_result_child = + fixed_point_column_wrapper{expected_result_child_values.begin(), + expected_result_child_values.end(), + numeric::scale_type{0}}; + auto expected_offsets = fixed_width_column_wrapper{0, 0, 4, 8, 12, 12, 12}.release(); + auto expected_num_rows = expected_offsets->size() - 1; + auto null_mask_iter = make_counting_transform_iterator( + size_type{0}, [expected_num_rows](auto i) { return i > 0 && i < 4; }); + + auto expected_result = make_lists_column( + expected_num_rows, + std::move(expected_offsets), + expected_result_child.release(), + 3, + cudf::test::detail::make_null_mask(null_mask_iter, null_mask_iter + expected_num_rows)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); + } +} + +TYPED_TEST(TypedCollectListTest, BasicGroupedRollingWindow) +{ + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const group_column = fixed_width_column_wrapper{1, 1, 1, 1, 1, 2, 2, 2, 2}; + auto const input_column = + fixed_width_column_wrapper{10, 11, 12, 13, 14, 20, 21, 22, 23}; + + auto const preceding = 2; + auto const following = 1; + auto const min_periods = 1; + auto const result = grouped_rolling_window(table_view{std::vector{group_column}}, + input_column, + preceding, + following, + min_periods, + make_collect_aggregation()); + + auto const expected_result = lists_column_wrapper{ + {10, 11}, + {10, 11, 12}, + {11, 12, 13}, + {12, 13, 14}, + {13, 14}, + {20, 21}, + {20, 21, 22}, + {21, 22, 23}, + {22, 23}}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +TYPED_TEST(TypedCollectListTest, BasicGroupedRollingWindowWithNulls) +{ + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const group_column = fixed_width_column_wrapper{1, 1, 1, 1, 1, 2, 2, 2, 2}; + auto const input_column = fixed_width_column_wrapper{ + {10, 11, 12, 13, 14, 20, 21, 22, 23}, {1, 0, 1, 1, 1, 1, 0, 1, 1}}; + + auto const preceding = 2; + auto const following = 1; + auto const min_periods = 1; + auto const result = grouped_rolling_window(table_view{std::vector{group_column}}, + input_column, + preceding, + following, + min_periods, + make_collect_aggregation()); + + auto expected_child = fixed_width_column_wrapper{ + {10, 11, 10, 11, 12, 11, 12, 13, 12, 13, 14, 13, 14, 20, 21, 20, 21, 22, 21, 22, 23, 22, 23}, + {1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1}}; + + auto expected_offsets = fixed_width_column_wrapper{0, 2, 5, 8, 11, 13, 15, 18, 21, 23}; + + auto expected_result = make_lists_column(static_cast(group_column).size(), + expected_offsets.release(), + expected_child.release(), + 0, + {}); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +TYPED_TEST(TypedCollectListTest, BasicGroupedTimeRangeRollingWindow) +{ + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const time_column = fixed_width_column_wrapper{ + 1, 1, 2, 2, 3, 1, 4, 5, 6}; + auto const group_column = fixed_width_column_wrapper{1, 1, 1, 1, 1, 2, 2, 2, 2}; + auto const input_column = + fixed_width_column_wrapper{10, 11, 12, 13, 14, 20, 21, 22, 23}; + auto const preceding = 2; + auto const following = 1; + auto const min_periods = 1; + auto const result = + grouped_time_range_rolling_window(table_view{std::vector{group_column}}, + time_column, + cudf::order::ASCENDING, + input_column, + preceding, + following, + min_periods, + make_collect_aggregation()); + + auto const expected_result = lists_column_wrapper{ + {10, 11, 12, 13}, + {10, 11, 12, 13}, + {10, 11, 12, 13, 14}, + {10, 11, 12, 13, 14}, + {10, 11, 12, 13, 14}, + {20}, + {21, 22}, + {21, 22, 23}, + {21, 22, 23}}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +TEST_F(CollectListTest, BasicGroupedTimeRangeRollingWindowOnStrings) +{ + using namespace cudf; + using namespace cudf::test; + + auto const time_column = fixed_width_column_wrapper{ + 1, 1, 2, 2, 3, 1, 4, 5, 6}; + auto const group_column = fixed_width_column_wrapper{1, 1, 1, 1, 1, 2, 2, 2, 2}; + auto const input_column = + strings_column_wrapper{"10", "11", "12", "13", "14", "20", "21", "22", "23"}; + auto const preceding = 2; + auto const following = 1; + auto const min_periods = 1; + auto const result = + grouped_time_range_rolling_window(table_view{std::vector{group_column}}, + time_column, + cudf::order::ASCENDING, + input_column, + preceding, + following, + min_periods, + make_collect_aggregation()); + + auto const expected_result = lists_column_wrapper{ + {"10", "11", "12", "13"}, + {"10", "11", "12", "13"}, + {"10", "11", "12", "13", "14"}, + {"10", "11", "12", "13", "14"}, + {"10", "11", "12", "13", "14"}, + {"20"}, + {"21", "22"}, + {"21", "22", "23"}, + {"21", "22", "23"}}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +TYPED_TEST(TypedCollectListTest, BasicGroupedTimeRangeRollingWindowOnStructs) +{ + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const time_column = fixed_width_column_wrapper{ + 1, 1, 2, 2, 3, 1, 4, 5, 6}; + auto const group_column = fixed_width_column_wrapper{1, 1, 1, 1, 1, 2, 2, 2, 2}; + auto numeric_member_column = + fixed_width_column_wrapper{10, 11, 12, 13, 14, 20, 21, 22, 23}; + auto string_member_column = + strings_column_wrapper{"10", "11", "12", "13", "14", "20", "21", "22", "23"}; + auto struct_members = std::vector>{}; + struct_members.emplace_back(numeric_member_column.release()); + struct_members.emplace_back(string_member_column.release()); + auto const struct_column = make_structs_column(9, std::move(struct_members), 0, {}); + auto const preceding = 2; + auto const following = 1; + auto const min_periods = 1; + auto const result = + grouped_time_range_rolling_window(table_view{std::vector{group_column}}, + time_column, + cudf::order::ASCENDING, + struct_column->view(), + preceding, + following, + min_periods, + make_collect_aggregation()); + + auto expected_numeric_column = fixed_width_column_wrapper{ + 10, 11, 12, 13, 10, 11, 12, 13, 10, 11, 12, 13, 14, 10, 11, 12, + 13, 14, 10, 11, 12, 13, 14, 20, 21, 22, 21, 22, 23, 21, 22, 23}; + + auto expected_string_column = strings_column_wrapper{ + "10", "11", "12", "13", "10", "11", "12", "13", "10", "11", "12", "13", "14", "10", "11", "12", + "13", "14", "10", "11", "12", "13", "14", "20", "21", "22", "21", "22", "23", "21", "22", "23"}; + + auto expected_struct_members = std::vector>{}; + expected_struct_members.emplace_back(expected_numeric_column.release()); + expected_struct_members.emplace_back(expected_string_column.release()); + + auto expected_structs_column = make_structs_column(32, std::move(expected_struct_members), 0, {}); + auto expected_offsets_column = + fixed_width_column_wrapper{0, 4, 8, 13, 18, 23, 24, 26, 29, 32}.release(); + auto expected_result = make_lists_column( + 9, std::move(expected_offsets_column), std::move(expected_structs_column), 0, {}); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +TYPED_TEST(TypedCollectListTest, GroupedTimeRangeRollingWindowWithMinPeriods) +{ + // Test that min_periods is honoured. + // i.e. output row is null when min_periods exceeds number of observations. + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const time_column = fixed_width_column_wrapper{ + 1, 1, 2, 2, 3, 1, 4, 5, 6}; + auto const group_column = fixed_width_column_wrapper{1, 1, 1, 1, 1, 2, 2, 2, 2}; + auto const input_column = + fixed_width_column_wrapper{10, 11, 12, 13, 14, 20, 21, 22, 23}; + auto const preceding = 2; + auto const following = 1; + auto const min_periods = 4; + auto const result = + grouped_time_range_rolling_window(table_view{std::vector{group_column}}, + time_column, + cudf::order::ASCENDING, + input_column, + preceding, + following, + min_periods, + make_collect_aggregation()); + + auto const expected_result = lists_column_wrapper{ + {{10, 11, 12, 13}, + {10, 11, 12, 13}, + {10, 11, 12, 13, 14}, + {10, 11, 12, 13, 14}, + {10, 11, 12, 13, 14}, + {}, + {}, + {}, + {}}, + make_counting_transform_iterator(0, [](auto i) { + return i < 5; + })}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +TEST_F(CollectListTest, GroupedTimeRangeRollingWindowOnStringsWithMinPeriods) +{ + // Test that min_periods is honoured. + // i.e. output row is null when min_periods exceeds number of observations. + using namespace cudf; + using namespace cudf::test; + + auto const time_column = fixed_width_column_wrapper{ + 1, 1, 2, 2, 3, 1, 4, 5, 6}; + auto const group_column = fixed_width_column_wrapper{1, 1, 1, 1, 1, 2, 2, 2, 2}; + auto const input_column = + strings_column_wrapper{"10", "11", "12", "13", "14", "20", "21", "22", "23"}; + auto const preceding = 2; + auto const following = 1; + auto const min_periods = 4; + auto const result = + grouped_time_range_rolling_window(table_view{std::vector{group_column}}, + time_column, + cudf::order::ASCENDING, + input_column, + preceding, + following, + min_periods, + make_collect_aggregation()); + + auto const expected_result = lists_column_wrapper{ + {{"10", "11", "12", "13"}, + {"10", "11", "12", "13"}, + {"10", "11", "12", "13", "14"}, + {"10", "11", "12", "13", "14"}, + {"10", "11", "12", "13", "14"}, + {}, + {}, + {}, + {}}, + make_counting_transform_iterator(0, [](auto i) { + return i < 5; + })}.release(); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +TYPED_TEST(TypedCollectListTest, GroupedTimeRangeRollingWindowOnStructsWithMinPeriods) +{ + // Test that min_periods is honoured. + // i.e. output row is null when min_periods exceeds number of observations. + using namespace cudf; + using namespace cudf::test; + + using T = TypeParam; + + auto const time_column = fixed_width_column_wrapper{ + 1, 1, 2, 2, 3, 1, 4, 5, 6}; + auto const group_column = fixed_width_column_wrapper{1, 1, 1, 1, 1, 2, 2, 2, 2}; + auto numeric_member_column = + fixed_width_column_wrapper{10, 11, 12, 13, 14, 20, 21, 22, 23}; + auto string_member_column = + strings_column_wrapper{"10", "11", "12", "13", "14", "20", "21", "22", "23"}; + auto struct_members = std::vector>{}; + struct_members.emplace_back(numeric_member_column.release()); + struct_members.emplace_back(string_member_column.release()); + auto const struct_column = make_structs_column(9, std::move(struct_members), 0, {}); + auto const preceding = 2; + auto const following = 1; + auto const min_periods = 4; + auto const result = + grouped_time_range_rolling_window(table_view{std::vector{group_column}}, + time_column, + cudf::order::ASCENDING, + struct_column->view(), + preceding, + following, + min_periods, + make_collect_aggregation()); + + auto expected_numeric_column = fixed_width_column_wrapper{ + 10, 11, 12, 13, 10, 11, 12, 13, 10, 11, 12, 13, 14, 10, 11, 12, 13, 14, 10, 11, 12, 13, 14}; + + auto expected_string_column = + strings_column_wrapper{"10", "11", "12", "13", "10", "11", "12", "13", "10", "11", "12", "13", + "14", "10", "11", "12", "13", "14", "10", "11", "12", "13", "14"}; + + auto expected_struct_members = std::vector>{}; + expected_struct_members.emplace_back(expected_numeric_column.release()); + expected_struct_members.emplace_back(expected_string_column.release()); + + auto expected_structs_column = make_structs_column(23, std::move(expected_struct_members), 0, {}); + auto expected_offsets_column = + fixed_width_column_wrapper{0, 4, 8, 13, 18, 23, 23, 23, 23, 23}.release(); + auto expected_validity_iter = make_counting_transform_iterator(0, [](auto i) { return i < 5; }); + auto expected_null_mask = + cudf::test::detail::make_null_mask(expected_validity_iter, expected_validity_iter + 9); + auto expected_result = make_lists_column(9, + std::move(expected_offsets_column), + std::move(expected_structs_column), + 4, + std::move(expected_null_mask)); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_result->view(), result->view()); +} + +CUDF_TEST_PROGRAM_MAIN()