diff --git a/cpp/include/cudf/detail/aggregation/aggregation.cuh b/cpp/include/cudf/detail/aggregation/aggregation.cuh index d4833bcf91e..09763d66403 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.cuh +++ b/cpp/include/cudf/detail/aggregation/aggregation.cuh @@ -53,6 +53,14 @@ struct corresponding_operator { using type = DeviceMax; }; template <> +struct corresponding_operator { + using type = DeviceMin; +}; +template <> +struct corresponding_operator { + using type = DeviceMax; +}; +template <> struct corresponding_operator { using type = DeviceMax; }; @@ -81,6 +89,10 @@ struct corresponding_operator { using type = DeviceSum; }; template <> +struct corresponding_operator { + using type = DeviceSum; +}; +template <> struct corresponding_operator { using type = DeviceCount; }; @@ -92,6 +104,12 @@ struct corresponding_operator { template using corresponding_operator_t = typename corresponding_operator::type; +template +constexpr bool has_corresponding_operator() +{ + return !std::is_same::type, void>::value; +} + template -class lead_lag_gather_map_builder { - public: - lead_lag_gather_map_builder(size_type input_size, - size_type row_offset, - PrecedingIterator preceding, - FollowingIterator following) - : _input_size{input_size}, - _null_index{input_size}, // Out of input range. Gather returns null. - _row_offset{row_offset}, - _preceding{preceding}, - _following{following} - { - } - - template - size_type __device__ operator()(size_type i) - { - // Note: grouped_*rolling_window() trims preceding/following to - // the beginning/end of the group. `rolling_window()` does not. - // Must trim _following[i] so as not to go past the column end. - auto following = min(_following[i], _input_size - i - 1); - return (_row_offset > following) ? _null_index : (i + _row_offset); - } - - template - size_type __device__ operator()(size_type i) - { - // Note: grouped_*rolling_window() trims preceding/following to - // the beginning/end of the group. `rolling_window()` does not. - // Must trim _preceding[i] so as not to go past the column start. - auto preceding = min(_preceding[i], i + 1); - return (_row_offset > (preceding - 1)) ? _null_index : (i - _row_offset); - } - - private: - size_type const _input_size; // Number of rows in input to LEAD/LAG. - size_type const _null_index; // Index value to use to output NULL for LEAD/LAG calculation. - size_type const _row_offset; // LEAD/LAG offset. E.g. For LEAD(2), _row_offset == 2. - PrecedingIterator _preceding; // Iterator to retrieve preceding window offset. - FollowingIterator _following; // Iterator to retrieve following window offset. -}; /** * @brief Predicate to find indices at which LEAD/LAG evaluated to null. @@ -110,33 +61,31 @@ is_null_index_predicate_impl is_null_index_predicate(size_type in /** * @brief Helper function to calculate LEAD/LAG for nested-type input columns. * - * @tparam op The sort of aggregation being done (LEAD vs LAG) - * @tparam InputType The datatype of the input column being aggregated * @tparam PrecedingIterator Iterator-type that returns the preceding bounds * @tparam FollowingIterator Iterator-type that returns the following bounds + * @param[in] op Aggregation kind. * @param[in] input Nested-type input column for LEAD/LAG calculation * @param[in] default_outputs Default values to use as outputs, if LEAD/LAG * offset crosses column/group boundaries * @param[in] preceding Iterator to retrieve preceding window bounds * @param[in] following Iterator to retrieve following window bounds - * @param[in] offset Lead/Lag offset, indicating which row after/before - * the current row is to be returned + * @param[in] row_offset Lead/Lag offset, indicating which row after/before + * the current row is to be returned * @param[in] stream CUDA stream for device memory operations/allocations * @param[in] mr device_memory_resource for device memory allocations */ -template ())> -std::unique_ptr compute_lead_lag_for_nested(column_view const& input, +template +std::unique_ptr compute_lead_lag_for_nested(aggregation::Kind op, + column_view const& input, column_view const& default_outputs, PrecedingIter preceding, FollowingIter following, - size_type offset, + size_type row_offset, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { + CUDF_EXPECTS(op == aggregation::LEAD || op == aggregation::LAG, + "Unexpected aggregation type in compute_lead_lag_for_nested"); CUDF_EXPECTS(default_outputs.type().id() == input.type().id(), "Defaults column type must match input column."); // Because LEAD/LAG. @@ -145,7 +94,7 @@ std::unique_ptr compute_lead_lag_for_nested(column_view const& input, // For LEAD(0)/LAG(0), no computation need be performed. // Return copy of input. - if (offset == 0) { return std::make_unique(input, stream, mr); } + if (row_offset == 0) { return std::make_unique(input, stream, mr); } // Algorithm: // @@ -174,12 +123,33 @@ std::unique_ptr compute_lead_lag_for_nested(column_view const& input, make_numeric_column(size_data_type, input.size(), mask_state::UNALLOCATED, stream); auto gather_map = gather_map_column->mutable_view(); - thrust::transform(rmm::exec_policy(stream), - thrust::make_counting_iterator(size_type{0}), - thrust::make_counting_iterator(size_type{input.size()}), - gather_map.begin(), - lead_lag_gather_map_builder{ - input.size(), offset, preceding, following}); + auto const input_size = input.size(); + auto const null_index = input.size(); + if (op == aggregation::LEAD) { + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(size_type{0}), + thrust::make_counting_iterator(size_type{input.size()}), + gather_map.begin(), + [following, input_size, null_index, row_offset] __device__(size_type i) { + // Note: grouped_*rolling_window() trims preceding/following to + // the beginning/end of the group. `rolling_window()` does not. + // Must trim _following[i] so as not to go past the column end. + auto _following = min(following[i], input_size - i - 1); + return (row_offset > _following) ? null_index : (i + row_offset); + }); + } else { + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(size_type{0}), + thrust::make_counting_iterator(size_type{input.size()}), + gather_map.begin(), + [preceding, input_size, null_index, row_offset] __device__(size_type i) { + // Note: grouped_*rolling_window() trims preceding/following to + // the beginning/end of the group. `rolling_window()` does not. + // Must trim _preceding[i] so as not to go past the column start. + auto _preceding = min(preceding[i], i + 1); + return (row_offset > (_preceding - 1)) ? null_index : (i - row_offset); + }); + } auto output_with_nulls = cudf::detail::gather(table_view{std::vector{input}}, diff --git a/cpp/src/rolling/rolling_collect_list.cuh b/cpp/src/rolling/rolling_collect_list.cuh new file mode 100644 index 00000000000..f5a2e59fd2a --- /dev/null +++ b/cpp/src/rolling/rolling_collect_list.cuh @@ -0,0 +1,358 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +namespace cudf { +namespace detail { + +namespace { +/** + * @brief Creates the offsets child of the result of the `COLLECT_LIST` window aggregation + * + * Given the input column, the preceding/following window bounds, and `min_periods`, + * the sizes of each list row may be computed. These values can then be used to + * calculate the offsets for the result of `COLLECT_LIST`. + * + * Note: If `min_periods` exceeds the number of observations for a window, the size + * is set to `0` (since the result is `null`). + */ +template +std::unique_ptr create_collect_offsets(size_type input_size, + PrecedingIter preceding_begin, + FollowingIter following_begin, + size_type min_periods, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + // Materialize offsets column. + auto static constexpr size_data_type = data_type{type_to_id()}; + auto sizes = + make_fixed_width_column(size_data_type, input_size, mask_state::UNALLOCATED, stream, mr); + auto mutable_sizes = sizes->mutable_view(); + + // Consider the following preceding/following values: + // preceding = [1,2,2,2,2] + // following = [1,1,1,1,0] + // The sum of the vectors should yield the window sizes: + // prec + foll = [2,3,3,3,2] + // + // If min_periods=2, all rows have at least `min_periods` observations. + // But if min_periods=3, rows at indices 0 and 4 have too few observations, and must return + // null. The sizes at these positions must be 0, i.e. + // prec + foll = [0,3,3,3,0] + thrust::transform(rmm::exec_policy(stream), + preceding_begin, + preceding_begin + input_size, + following_begin, + mutable_sizes.begin(), + [min_periods] __device__(auto const preceding, auto const following) { + return (preceding + following) < min_periods ? 0 : (preceding + following); + }); + + // Convert `sizes` to an offsets column, via inclusive_scan(): + return strings::detail::make_offsets_child_column( + sizes->view().begin(), sizes->view().end(), stream, mr); +} + +/** + * @brief Generate mapping of each row in the COLLECT_LIST result's child column + * to the index of the row it belongs to. + * + * If + * input col == [A,B,C,D,E] + * and preceding == [1,2,2,2,2], + * and following == [1,1,1,1,0], + * then, + * collect result == [ [A,B], [A,B,C], [B,C,D], [C,D,E], [D,E] ] + * i.e. result offset column == [0,2,5,8,11,13], + * and result child column == [A,B,A,B,C,B,C,D,C,D,E,D,E]. + * Mapping back to `input` == [0,1,0,1,2,1,2,3,2,3,4,3,4] + */ +std::unique_ptr get_list_child_to_list_row_mapping(cudf::column_view const& offsets, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + auto static constexpr size_data_type = data_type{type_to_id()}; + + // First, reduce offsets column by key, to identify the number of times + // an offset appears. + // Next, scatter the count for each offset (except the first and last), + // into a column of N `0`s, where N == number of child rows. + // For the example above: + // offsets == [0, 2, 5, 8, 11, 13] + // scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0] + // + // If the above example had an empty list row at index 2, + // the same columns would look as follows: + // offsets == [0, 2, 5, 5, 8, 11, 13] + // scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0] + // + // Note: To correctly handle null list rows at the beginning of + // the output column, care must be taken to skip the first `0` + // in the offsets column, when running `reduce_by_key()`. + // This accounts for the `0` added by default to the offsets + // column, marking the beginning of the column. + + auto const num_child_rows{ + cudf::detail::get_value(offsets, offsets.size() - 1, stream)}; + + auto scatter_values = + make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr); + auto scatter_keys = + make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr); + auto reduced_by_key = + thrust::reduce_by_key(rmm::exec_policy(stream), + offsets.template begin() + 1, // Skip first 0 in offsets. + offsets.template end(), + thrust::make_constant_iterator(1), + scatter_keys->mutable_view().template begin(), + scatter_values->mutable_view().template begin()); + auto scatter_values_end = reduced_by_key.second; + auto scatter_output = + make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr); + thrust::fill_n(rmm::exec_policy(stream), + scatter_output->mutable_view().template begin(), + num_child_rows, + 0); // [0,0,0,...0] + thrust::scatter(rmm::exec_policy(stream), + scatter_values->mutable_view().template begin(), + scatter_values_end, + scatter_keys->view().template begin(), + scatter_output->mutable_view().template begin()); // [0,0,1,0,0,1,...] + + // Next, generate mapping with inclusive_scan() on scatter() result. + // For the example above: + // scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0] + // inclusive_scan == [0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4] + // + // For the case with an empty list at index 3: + // scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0] + // inclusive_scan == [0, 0, 1, 1, 1, 3, 3, 3, 4, 4, 4, 5, 5] + auto per_row_mapping = + make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr); + thrust::inclusive_scan(rmm::exec_policy(stream), + scatter_output->view().template begin(), + scatter_output->view().template end(), + per_row_mapping->mutable_view().template begin()); + return per_row_mapping; +} + +/** + * @brief Create gather map to generate the child column of the result of + * the `COLLECT_LIST` window aggregation. + */ +template +std::unique_ptr create_collect_gather_map(column_view const& child_offsets, + column_view const& per_row_mapping, + PrecedingIter preceding_iter, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + auto gather_map = make_fixed_width_column(data_type{type_to_id()}, + per_row_mapping.size(), + mask_state::UNALLOCATED, + stream, + mr); + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(per_row_mapping.size()), + gather_map->mutable_view().template begin(), + [d_offsets = + child_offsets.template begin(), // E.g. [0, 2, 5, 8, 11, 13] + d_groups = + per_row_mapping.template begin(), // E.g. [0,0, 1,1,1, 2,2,2, 3,3,3, 4,4] + d_prev = preceding_iter] __device__(auto i) { + auto group = d_groups[i]; + auto group_start_offset = d_offsets[group]; + auto relative_index = i - group_start_offset; + + return (group - d_prev[group] + 1) + relative_index; + }); + return gather_map; +} + +/** + * @brief Count null entries in result of COLLECT_LIST. + */ +size_type count_child_nulls(column_view const& input, + std::unique_ptr const& gather_map, + rmm::cuda_stream_view stream) +{ + auto input_device_view = column_device_view::create(input, stream); + + auto input_row_is_null = [d_input = *input_device_view] __device__(auto i) { + return d_input.is_null_nocheck(i); + }; + + return thrust::count_if(rmm::exec_policy(stream), + gather_map->view().template begin(), + gather_map->view().template end(), + input_row_is_null); +} + +/** + * @brief Purge entries for null inputs from gather_map, and adjust offsets. + */ +std::pair, std::unique_ptr> purge_null_entries( + column_view const& input, + column_view const& gather_map, + column_view const& offsets, + size_type num_child_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + auto input_device_view = column_device_view::create(input, stream); + + auto input_row_not_null = [d_input = *input_device_view] __device__(auto i) { + return d_input.is_valid_nocheck(i); + }; + + // Purge entries in gather_map that correspond to null input. + auto new_gather_map = make_fixed_width_column(data_type{type_to_id()}, + gather_map.size() - num_child_nulls, + mask_state::UNALLOCATED, + stream, + mr); + thrust::copy_if(rmm::exec_policy(stream), + gather_map.template begin(), + gather_map.template end(), + new_gather_map->mutable_view().template begin(), + input_row_not_null); + + // Recalculate offsets after null entries are purged. + auto new_sizes = make_fixed_width_column( + data_type{type_to_id()}, input.size(), mask_state::UNALLOCATED, stream, mr); + + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.size()), + new_sizes->mutable_view().template begin(), + [d_gather_map = gather_map.template begin(), + d_old_offsets = offsets.template begin(), + input_row_not_null] __device__(auto i) { + return thrust::count_if(thrust::seq, + d_gather_map + d_old_offsets[i], + d_gather_map + d_old_offsets[i + 1], + input_row_not_null); + }); + + auto new_offsets = + strings::detail::make_offsets_child_column(new_sizes->view().template begin(), + new_sizes->view().template end(), + stream, + mr); + + return std::make_pair, std::unique_ptr>(std::move(new_gather_map), + std::move(new_offsets)); +} + +} // anonymous namespace + +template +std::unique_ptr rolling_collect_list(column_view const& input, + column_view const& default_outputs, + PrecedingIter preceding_begin_raw, + FollowingIter following_begin_raw, + size_type min_periods, + rolling_aggregation const& agg, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_EXPECTS(default_outputs.is_empty(), + "COLLECT_LIST window function does not support default values."); + + if (input.is_empty()) return empty_like(input); + + // Fix up preceding/following iterators to respect column boundaries, + // similar to gpu_rolling(). + // `rolling_window()` does not fix up preceding/following so as not to read past + // column boundaries. + // `grouped_rolling_window()` and `time_range_based_grouped_rolling_window() do. + auto preceding_begin = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), [preceding_begin_raw] __device__(auto i) { + return thrust::min(preceding_begin_raw[i], i + 1); + }); + auto following_begin = + thrust::make_transform_iterator(thrust::make_counting_iterator(0), + [following_begin_raw, size = input.size()] __device__(auto i) { + return thrust::min(following_begin_raw[i], size - i - 1); + }); + + // Materialize collect list's offsets. + auto offsets = + create_collect_offsets(input.size(), preceding_begin, following_begin, min_periods, stream, mr); + + // Map each element of the collect() result's child column + // to the index where it appears in the input. + auto per_row_mapping = get_list_child_to_list_row_mapping(offsets->view(), stream, mr); + + // Generate gather map to produce the collect() result's child column. + auto gather_map = create_collect_gather_map( + offsets->view(), per_row_mapping->view(), preceding_begin, stream, mr); + + // If gather_map collects null elements, and null_policy == EXCLUDE, + // those elements must be filtered out, and offsets recomputed. + auto null_handling = dynamic_cast(agg)._null_handling; + if (null_handling == null_policy::EXCLUDE && input.has_nulls()) { + auto num_child_nulls = count_child_nulls(input, gather_map, stream); + if (num_child_nulls != 0) { + std::tie(gather_map, offsets) = + purge_null_entries(input, *gather_map, *offsets, num_child_nulls, stream, mr); + } + } + + // gather(), to construct child column. + auto gather_output = + cudf::gather(table_view{std::vector{input}}, gather_map->view()); + + rmm::device_buffer null_mask; + size_type null_count; + std::tie(null_mask, null_count) = valid_if( + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.size()), + [preceding_begin, following_begin, min_periods] __device__(auto i) { + return (preceding_begin[i] + following_begin[i]) >= min_periods; + }, + stream, + mr); + + return make_lists_column(input.size(), + std::move(offsets), + std::move(gather_output->release()[0]), + null_count, + std::move(null_mask), + stream, + mr); +} + +} // namespace detail +} // namespace cudf diff --git a/cpp/src/rolling/rolling_detail.cuh b/cpp/src/rolling/rolling_detail.cuh index 1192b9cad87..9e6d135b153 100644 --- a/cpp/src/rolling/rolling_detail.cuh +++ b/cpp/src/rolling/rolling_detail.cuh @@ -17,6 +17,9 @@ #pragma once #include "lead_lag_nested_detail.cuh" +#include "rolling/rolling_collect_list.cuh" +#include "rolling/rolling_detail.hpp" +#include "rolling/rolling_jit_detail.hpp" #include "rolling_detail.hpp" #include @@ -28,7 +31,6 @@ #include #include #include -#include #include #include #include @@ -64,375 +66,695 @@ namespace cudf { namespace detail { + namespace { // anonymous + /** - * @brief Only COUNT_VALID operation is executed and count is updated - * depending on `min_periods` and returns true if it was - * valid, else false. + * @brief Operator for applying a generic (non-specialized) rolling aggregation on a single window. */ -template * = nullptr> -bool __device__ process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) -{ - // declare this as volatile to avoid some compiler optimizations that lead to incorrect results - // for CUDA 10.0 and below (fixed in CUDA 10.1) - volatile cudf::size_type count = 0; +template +struct DeviceRolling { + size_type min_periods; - bool output_is_valid = ((end_index - start_index) >= min_periods); + // what operations do we support + template + static constexpr bool is_supported() + { + return cudf::detail::is_valid_aggregation() && has_corresponding_operator() && + // TODO: Delete all this extra logic once is_valid_aggregation<> cleans up some edge + // cases it isn't handling. + // MIN/MAX supports all fixed width types + (((O == aggregation::MIN || O == aggregation::MAX) && cudf::is_fixed_width()) || - if (output_is_valid) { - if (!has_nulls) { - count = end_index - start_index; - } else { - count = thrust::count_if(thrust::seq, - thrust::make_counting_iterator(start_index), - thrust::make_counting_iterator(end_index), - [&input](auto i) { return input.is_valid_nocheck(i); }); + // SUM supports all fixed width types except timestamps + ((O == aggregation::SUM) && (cudf::is_fixed_width() && !cudf::is_timestamp())) || + + // MEAN supports numeric and duration + ((O == aggregation::MEAN) && (cudf::is_numeric() || cudf::is_duration()))); + } + + // operations we do support + template + DeviceRolling(size_type _min_periods, typename std::enable_if_t()>* = nullptr) + : min_periods(_min_periods) + { + } + + // operations we don't support + template + DeviceRolling(size_type _min_periods, typename std::enable_if_t()>* = nullptr) + : min_periods(_min_periods) + { + CUDF_FAIL("Invalid aggregation/type pair"); + } + + // perform the windowing operation + template + bool __device__ operator()(column_device_view const& input, + column_device_view const& ignored_default_outputs, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) + { + using AggOp = typename corresponding_operator::type; + AggOp agg_op; + + // declare this as volatile to avoid some compiler optimizations that lead to incorrect results + // for CUDA 10.0 and below (fixed in CUDA 10.1) + volatile cudf::size_type count = 0; + OutputType val = AggOp::template identity(); + + for (size_type j = start_index; j < end_index; j++) { + if (!has_nulls || input.is_valid(j)) { + OutputType element = input.element>(j); + val = agg_op(element, val); + count++; + } + } + + bool output_is_valid = (count >= min_periods); + + // store the output value, one per thread + cudf::detail::rolling_store_output_functor{}( + output.element(current_index), val, count); + + return output_is_valid; + } +}; + +/** + * @brief Operator for applying an ARGMAX/ARGMIN rolling aggregation on a single window. + */ +template +struct DeviceRollingArgMinMax { + size_type min_periods; + + // what operations do we support + template + static constexpr bool is_supported() + { + // strictly speaking, I think it would be ok to make this work + // for comparable types as well. but right now the only use case is + // for MIN/MAX on strings. + return std::is_same::value; + } + + DeviceRollingArgMinMax(size_type _min_periods) : min_periods(_min_periods) {} + + template + bool __device__ operator()(column_device_view const& input, + column_device_view const& ignored_default_outputs, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) + { + using AggOp = typename corresponding_operator::type; + AggOp agg_op; + + // declare this as volatile to avoid some compiler optimizations that lead to incorrect results + // for CUDA 10.0 and below (fixed in CUDA 10.1) + volatile cudf::size_type count = 0; + InputType val = AggOp::template identity(); + OutputType val_index = (op == aggregation::ARGMIN) ? ARGMIN_SENTINEL : ARGMAX_SENTINEL; + + for (size_type j = start_index; j < end_index; j++) { + if (!has_nulls || input.is_valid(j)) { + InputType element = input.element(j); + val = agg_op(element, val); + if (val == element) { val_index = j; } + count++; + } + } + + bool output_is_valid = (count >= min_periods); + // -1 will help identify null elements while gathering for Min and Max + // In case of count, this would be null, so doesn't matter. + output.element(current_index) = (output_is_valid) ? val_index : -1; + + // The gather mask shouldn't contain null values, so + // always return zero + return true; + } +}; + +/** + * @brief Operator for applying a COUNT_VALID rolling aggregation on a single window. + */ +template +struct DeviceRollingCountValid { + size_type min_periods; + + // what operations do we support + template + static constexpr bool is_supported() + { + return true; + } + + DeviceRollingCountValid(size_type _min_periods) : min_periods(_min_periods) {} + + template + bool __device__ operator()(column_device_view const& input, + column_device_view const& ignored_default_outputs, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) + { + // declare this as volatile to avoid some compiler optimizations that lead to incorrect results + // for CUDA 10.0 and below (fixed in CUDA 10.1) + volatile cudf::size_type count = 0; + + bool output_is_valid = ((end_index - start_index) >= min_periods); + + if (output_is_valid) { + if (!has_nulls) { + count = end_index - start_index; + } else { + count = thrust::count_if(thrust::seq, + thrust::make_counting_iterator(start_index), + thrust::make_counting_iterator(end_index), + [&input](auto i) { return input.is_valid_nocheck(i); }); + } + output.element(current_index) = count; } + + return output_is_valid; + } +}; + +/** + * @brief Operator for applying a COUNT_ALL rolling aggregation on a single window. + */ +template +struct DeviceRollingCountAll { + size_type min_periods; + + // what operations do we support + template + static constexpr bool is_supported() + { + return true; + } + + DeviceRollingCountAll(size_type _min_periods) : min_periods(_min_periods) {} + + template + bool __device__ operator()(column_device_view const& input, + column_device_view const& ignored_default_outputs, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) + { + cudf::size_type count = end_index - start_index; + + bool output_is_valid = count >= min_periods; output.element(current_index) = count; + + return output_is_valid; } +}; - return output_is_valid; -} +/** + * @brief Operator for applying a ROW_NUMBER rolling aggregation on a single window. + */ +template +struct DeviceRollingRowNumber { + size_type min_periods; + + // what operations do we support + template + static constexpr bool is_supported() + { + return true; + } + + DeviceRollingRowNumber(size_type _min_periods) : min_periods(_min_periods) {} + + template + bool __device__ operator()(column_device_view const& input, + column_device_view const& ignored_default_outputs, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) + { + bool output_is_valid = end_index - start_index >= min_periods; + output.element(current_index) = current_index - start_index + 1; + + return output_is_valid; + } +}; /** - * @brief Only COUNT_ALL operation is executed and count is updated - * depending on `min_periods` and returns true if it was - * valid, else false. + * @brief Operator for applying a LEAD rolling aggregation on a single window. */ -template * = nullptr> -bool __device__ process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) -{ - cudf::size_type count = end_index - start_index; +template +struct DeviceRollingLead { + size_type row_offset; - bool output_is_valid = count >= min_periods; - output.element(current_index) = count; + // what operations do we support + template + static constexpr bool is_supported() + { + return cudf::is_fixed_width(); + } - return output_is_valid; -} + template ()>* = nullptr> + DeviceRollingLead(size_type _row_offset) : row_offset(_row_offset) + { + } + + template ()>* = nullptr> + DeviceRollingLead(size_type _row_offset) : row_offset(_row_offset) + { + CUDF_FAIL("Invalid aggregation/type pair"); + } + + template + bool __device__ operator()(column_device_view const& input, + column_device_view const& default_outputs, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) + { + // Offsets have already been normalized. + + // Check if row is invalid. + if (row_offset > (end_index - current_index - 1)) { + // Invalid row marked. Use default value, if available. + if (default_outputs.size() == 0 || default_outputs.is_null(current_index)) { return false; } + + output.element(current_index) = + default_outputs.element(current_index); + return true; + } + + // Not an invalid row. + auto index = current_index + row_offset; + auto is_null = input.is_null(index); + if (!is_null) { + output.element(current_index) = + input.element>(index); + } + return !is_null; + } +}; /** - * @brief Calculates row-number of current index within [start_index, end_index). Count is updated - * depending on `min_periods`. Returns `true` if it was valid, else `false`. + * @brief Operator for applying a LAG rolling aggregation on a single window. */ -template * = nullptr> -bool __device__ process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) -{ - bool output_is_valid = end_index - start_index >= min_periods; - output.element(current_index) = current_index - start_index + 1; +template +struct DeviceRollingLag { + size_type row_offset; - return output_is_valid; -} + // what operations do we support + template + static constexpr bool is_supported() + { + return cudf::is_fixed_width(); + } + + template ()>* = nullptr> + DeviceRollingLag(size_type _row_offset) : row_offset(_row_offset) + { + } + + template ()>* = nullptr> + DeviceRollingLag(size_type _row_offset) : row_offset(_row_offset) + { + CUDF_FAIL("Invalid aggregation/type pair"); + } + + template + bool __device__ operator()(column_device_view const& input, + column_device_view const& default_outputs, + mutable_column_device_view& output, + size_type start_index, + size_type end_index, + size_type current_index) + { + // Offsets have already been normalized. + + // Check if row is invalid. + if (row_offset > (current_index - start_index)) { + // Invalid row marked. Use default value, if available. + if (default_outputs.size() == 0 || default_outputs.is_null(current_index)) { return false; } + + output.element(current_index) = + default_outputs.element(current_index); + return true; + } + + // Not an invalid row. + auto index = current_index - row_offset; + auto is_null = input.is_null(index); + if (!is_null) { + output.element(current_index) = + input.element>(index); + } + return !is_null; + } +}; /** - * @brief LEAD(N): Returns the row from the input column, at the specified offset past the - * current row. - * If the offset crosses the grouping boundary or column boundary for - * a given row, a "default" value is returned. The "default" value is null, by default. + * @brief Maps an `InputType and `aggregation::Kind` value to it's corresponding + * rolling window operator. * - * E.g. Consider an input column with the following values and grouping: - * [10, 11, 12, 13, 20, 21, 22, 23] - * <------G1-----> <------G2------> - * - * LEAD(input_col, 1) yields: - * [11, 12, 13, null, 21, 22, 23, null] - * - * LEAD(input_col, 1, 99) (where 99 indicates the default) yields: - * [11, 12, 13, 99, 21, 22, 23, 99] + * @tparam InputType The input type to map to its corresponding operator + * @tparam k The `aggregation::Kind` value to map to its corresponding operator */ -template -std::enable_if_t<(op == aggregation::LEAD) && (cudf::is_fixed_width()), bool> __device__ -process_rolling_window(column_device_view input, - column_device_view default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods, - agg_op device_agg_op) -{ - // Offsets have already been normalized. - auto row_offset = device_agg_op.row_offset; +template +struct corresponding_rolling_operator { + using type = DeviceRolling; +}; - // Check if row is invalid. - if (row_offset > (end_index - current_index - 1)) { - // Invalid row marked. Use default value, if available. - if (default_outputs.size() == 0 || default_outputs.is_null(current_index)) { return false; } +template +struct corresponding_rolling_operator { + using type = DeviceRollingArgMinMax; +}; - output.element(current_index) = default_outputs.element(current_index); - return true; +template +struct corresponding_rolling_operator { + using type = DeviceRollingArgMinMax; +}; + +template +struct corresponding_rolling_operator { + using type = DeviceRollingCountValid; +}; + +template +struct corresponding_rolling_operator { + using type = DeviceRollingCountAll; +}; + +template +struct corresponding_rolling_operator { + using type = DeviceRollingRowNumber; +}; + +template +struct corresponding_rolling_operator { + using type = DeviceRollingLead; +}; + +template +struct corresponding_rolling_operator { + using type = DeviceRollingLag; +}; + +/** + * @brief Functor for creating a device rolling operator based on input type and aggregation type. + */ +template +struct create_rolling_operator { + auto operator()(size_type min_periods, rolling_aggregation const& agg) + { + CUDF_FAIL("Invalid aggregation/type pair"); } +}; - // Not an invalid row. - auto index = current_index + row_offset; - auto is_null = input.is_null(index); - if (!is_null) { output.element(current_index) = input.element(index); } - return !is_null; -} +template +struct create_rolling_operator< + InputType, + op, + std::enable_if_t::type::is_supported()>> { + template < + typename T = InputType, + aggregation::Kind O = op, + std::enable_if_t* = nullptr> + auto operator()(size_type min_periods, rolling_aggregation const& agg) + { + return typename corresponding_rolling_operator::type(min_periods); + } + + template * = nullptr> + auto operator()(size_type min_periods, rolling_aggregation const& agg) + { + return DeviceRollingLead{ + dynamic_cast(agg).row_offset}; + } + + template * = nullptr> + auto operator()(size_type min_periods, rolling_aggregation const& agg) + { + return DeviceRollingLag{ + dynamic_cast(agg).row_offset}; + } +}; /** - * @brief LAG(N): returns the row from the input column at the specified offset preceding - * the current row. - * If the offset crosses the grouping boundary or column boundary for - * a given row, a "default" value is returned. The "default" value is null, by default. + * @brief Rolling window specific implementation of simple_aggregations_collector. + * + * The purpose of this class is to preprocess incoming aggregation/type pairs and + * potentially transform them into other aggregation/type pairs. Typically when this + * happens, the equivalent aggregation/type implementation of finalize() will perform + * some postprocessing step. + * + * An example of this would be applying a MIN aggregation to strings. This cannot be done + * directly in the rolling operation, so instead the following happens: * - * E.g. Consider an input column with the following values and grouping: - * [10, 11, 12, 13, 20, 21, 22, 23] - * <------G1-----> <------G2------> + * - the rolling_aggregation_preprocessor transforms the incoming MIN/string pair to + * an ARGMIN/int pair. + * - The ARGMIN/int has the rolling operation applied to it, generating a list of indices + * that can then be used as a gather map. + * - The rolling_aggregation_postprocessor then takes this gather map and performs a final + * gather() on the input string data to generate the final output. + * + * Another example is COLLECT_LIST. COLLECT_LIST is odd in that it doesn't go through the + * normal gpu rolling kernel at all. It has a completely custom implementation. So the + * following happens: + * + * - the rolling_aggregation_preprocessor transforms the COLLECT_LIST aggregation into nothing, + * since no actual rolling window operation will be performed. + * - the rolling_aggregation_postprocessor calls the specialized rolling_collect_list() + * function to generate the final output. * - * LAG(input_col, 2) yields: - * [null, null, 10, 11, null, null, 20, 21] - * LAG(input_col, 2, 99) yields: - * [99, 99, 10, 11, 99, 99, 20, 21] */ -template -std::enable_if_t<(op == aggregation::LAG) && (cudf::is_fixed_width()), bool> __device__ -process_rolling_window(column_device_view input, - column_device_view default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods, - agg_op device_agg_op) -{ - // Offsets have already been normalized. - auto row_offset = device_agg_op.row_offset; +class rolling_aggregation_preprocessor final : public cudf::detail::simple_aggregations_collector { + public: + using cudf::detail::simple_aggregations_collector::visit; + + // NOTE : all other aggregations are passed through unchanged via the default + // visit() function in the simple_aggregations_collector. + + // MIN aggregations with strings are processed in 2 passes. The first pass performs + // the rolling operation on a ARGMIN aggregation to generate indices instead of values. + // Then a second pass uses those indices to gather the final strings. This step + // translates the the MIN -> ARGMIN aggregation + std::vector> visit(data_type col_type, + cudf::detail::min_aggregation const& agg) override + { + std::vector> aggs; + aggs.push_back(col_type.id() == type_id::STRING ? make_argmin_aggregation() + : make_min_aggregation()); + return aggs; + } - // Check if row is invalid. - if (row_offset > (current_index - start_index)) { - // Invalid row marked. Use default value, if available. - if (default_outputs.size() == 0 || default_outputs.is_null(current_index)) { return false; } + // MAX aggregations with strings are processed in 2 passes. The first pass performs + // the rolling operation on a ARGMAX aggregation to generate indices instead of values. + // Then a second pass uses those indices to gather the final strings. This step + // translates the the MAX -> ARGMAX aggregation + std::vector> visit(data_type col_type, + cudf::detail::max_aggregation const& agg) override + { + std::vector> aggs; + aggs.push_back(col_type.id() == type_id::STRING ? make_argmax_aggregation() + : make_max_aggregation()); + return aggs; + } - output.element(current_index) = default_outputs.element(current_index); - return true; + // COLLECT_LIST aggregations do not peform a rolling operation at all. They get processed + // entirely in the finalize() step. + std::vector> visit( + data_type col_type, cudf::detail::collect_list_aggregation const& agg) override + { + return {}; } - // Not an invalid row. - auto index = current_index - row_offset; - auto is_null = input.is_null(index); - if (!is_null) { output.element(current_index) = input.element(index); } - return !is_null; -} + // LEAD and LAG have custom behaviors for non fixed-width types. + std::vector> visit( + data_type col_type, cudf::detail::lead_lag_aggregation const& agg) override + { + // no rolling operation for non-fixed width. just a postprocess step at the end + if (!cudf::is_fixed_width(col_type)) { return {}; } + // otherwise, pass through + std::vector> aggs; + aggs.push_back(agg.clone()); + return aggs; + } +}; /** - * @brief Only used for `string_view` type to get ARGMIN and ARGMAX, which - * will be used to gather MIN and MAX. And returns true if the - * operation was valid, else false. + * @brief Rolling window specific implementation of aggregation_finalizer. + * + * The purpose of this class is to postprocess rolling window data depending on the + * aggregation/type pair. See the description of rolling_aggregation_preprocessor for + * a detailed description. + * */ -template ::value>* = nullptr> -bool __device__ process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) -{ - // declare this as volatile to avoid some compiler optimizations that lead to incorrect results - // for CUDA 10.0 and below (fixed in CUDA 10.1) - volatile cudf::size_type count = 0; - InputType val = agg_op::template identity(); - OutputType val_index = (op == aggregation::ARGMIN) ? ARGMIN_SENTINEL : ARGMAX_SENTINEL; - - for (size_type j = start_index; j < end_index; j++) { - if (!has_nulls || input.is_valid(j)) { - InputType element = input.element(j); - val = agg_op{}(element, val); - if (val == element) { val_index = j; } - count++; - } +template +class rolling_aggregation_postprocessor final : public cudf::detail::aggregation_finalizer { + public: + using cudf::detail::aggregation_finalizer::visit; + + rolling_aggregation_postprocessor(column_view const& _input, + column_view const& _default_outputs, + data_type _result_type, + PrecedingWindowIterator _preceding_window_begin, + FollowingWindowIterator _following_window_begin, + int _min_periods, + std::unique_ptr&& _intermediate, + rmm::cuda_stream_view _stream, + rmm::mr::device_memory_resource* _mr) + : + + input(_input), + default_outputs(_default_outputs), + result_type(_result_type), + preceding_window_begin(_preceding_window_begin), + following_window_begin(_following_window_begin), + min_periods(_min_periods), + intermediate(std::move(_intermediate)), + result(nullptr), + stream(_stream), + mr(_mr) + { } - bool output_is_valid = (count >= min_periods); - // -1 will help identify null elements while gathering for Min and Max - // In case of count, this would be null, so doesn't matter. - output.element(current_index) = (output_is_valid) ? val_index : -1; + // all non-specialized aggregation types simply pass the intermediate result through. + void visit(aggregation const& agg) override { result = std::move(intermediate); } - // The gather mask shouldn't contain null values, so - // always return zero - return true; -} + // perform a final gather on the generated ARGMIN data + void visit(cudf::detail::min_aggregation const& agg) override + { + if (result_type.id() == type_id::STRING) { + // The rows that represent null elements will have negative values in gather map, + // and that's why nullify_out_of_bounds/ignore_out_of_bounds is true. + auto output_table = detail::gather(table_view{{input}}, + intermediate->view(), + cudf::out_of_bounds_policy::NULLIFY, + detail::negative_index_policy::NOT_ALLOWED, + stream, + mr); + result = std::make_unique(std::move(output_table->get_column(0))); + } else { + result = std::move(intermediate); + } + } -/** - * @brief Operates on only fixed-width types and returns true if the - * operation was valid, else false. - */ -template ::value and - !(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL || - op == aggregation::ROW_NUMBER || op == aggregation::LEAD || - op == aggregation::LAG || op == aggregation::COLLECT_LIST)>* = nullptr> -bool __device__ process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) -{ - // declare this as volatile to avoid some compiler optimizations that lead to incorrect results - // for CUDA 10.0 and below (fixed in CUDA 10.1) - volatile cudf::size_type count = 0; - OutputType val = agg_op::template identity(); - - for (size_type j = start_index; j < end_index; j++) { - if (!has_nulls || input.is_valid(j)) { - OutputType element = input.element(j); - val = agg_op{}(element, val); - count++; + // perform a final gather on the generated ARGMAX data + void visit(cudf::detail::max_aggregation const& agg) override + { + if (result_type.id() == type_id::STRING) { + // The rows that represent null elements will have negative values in gather map, + // and that's why nullify_out_of_bounds/ignore_out_of_bounds is true. + auto output_table = detail::gather(table_view{{input}}, + intermediate->view(), + cudf::out_of_bounds_policy::NULLIFY, + detail::negative_index_policy::NOT_ALLOWED, + stream, + mr); + result = std::make_unique(std::move(output_table->get_column(0))); + } else { + result = std::move(intermediate); } } - bool output_is_valid = (count >= min_periods); + // perform the actual COLLECT_LIST operation entirely. + void visit(cudf::detail::collect_list_aggregation const& agg) override + { + result = rolling_collect_list(input, + default_outputs, + preceding_window_begin, + following_window_begin, + min_periods, + agg, + stream, + mr); + } - // store the output value, one per thread - cudf::detail::rolling_store_output_functor{}( - output.element(current_index), val, count); + std::unique_ptr get_result() + { + CUDF_EXPECTS(result != nullptr, + "Calling result on rolling aggregation postprocessor that has not been visited in " + "rolling_window"); + return std::move(result); + } - return output_is_valid; -} + // LEAD and LAG have custom behaviors for non fixed-width types. + void visit(cudf::detail::lead_lag_aggregation const& agg) override + { + // if this is non-fixed width, run the custom lead-lag code + if (!cudf::is_fixed_width(result_type)) { + result = + cudf::detail::compute_lead_lag_for_nested( + agg.kind, + input, + default_outputs, + preceding_window_begin, + following_window_begin, + agg.row_offset, + stream, + mr); + } + // otherwise just pass through the intermediate + else { + result = std::move(intermediate); + } + } + + private: + column_view input; + column_view default_outputs; + data_type result_type; + PrecedingWindowIterator preceding_window_begin; + FollowingWindowIterator following_window_begin; + int min_periods; + std::unique_ptr intermediate; + std::unique_ptr result; + rmm::cuda_stream_view stream; + rmm::mr::device_memory_resource* mr; +}; /** * @brief Computes the rolling window function * * @tparam InputType Datatype of `input` * @tparam OutputType Datatype of `output` - * @tparam agg_op A functor that defines the aggregation operation * @tparam op The aggregation operator (enum value) * @tparam block_size CUDA block size for the kernel * @tparam has_nulls true if the input column has nulls + * @tparam DeviceRollingOperator An operator that performs a single windowing operation * @tparam PrecedingWindowIterator iterator type (inferred) * @tparam FollowingWindowIterator iterator type (inferred) * @param input Input column device view + * @param default_outputs A column of per-row default values to be returned instead + * of nulls for certain aggregation types. * @param output Output column device view + * @param output_valid_count Output count of valid values + * @param device_operator The operator used to perform a single window operation * @param preceding_window_begin[in] Rolling window size iterator, accumulates from * in_col[i-preceding_window] to in_col[i] inclusive * @param following_window_begin[in] Rolling window size iterator in the forward * direction, accumulates from in_col[i] to * in_col[i+following_window] inclusive - * @param min_periods[in] Minimum number of observations in window required to - * have a value, otherwise 0 is stored in the valid bit mask */ template -__launch_bounds__(block_size) __global__ - void gpu_rolling(column_device_view input, - column_device_view default_outputs, - mutable_column_device_view output, - size_type* __restrict__ output_valid_count, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods) -{ - size_type i = blockIdx.x * block_size + threadIdx.x; - size_type stride = block_size * gridDim.x; - - size_type warp_valid_count{0}; - - auto active_threads = __ballot_sync(0xffffffff, i < input.size()); - while (i < input.size()) { - size_type preceding_window = preceding_window_begin[i]; - size_type following_window = following_window_begin[i]; - - // compute bounds - size_type start = min(input.size(), max(0, i - preceding_window + 1)); - size_type end = min(input.size(), max(0, i + following_window + 1)); - size_type start_index = min(start, end); - size_type end_index = max(start, end); - - // aggregate - // TODO: We should explore using shared memory to avoid redundant loads. - // This might require separating the kernel into a special version - // for dynamic and static sizes. - - volatile bool output_is_valid = false; - output_is_valid = process_rolling_window( - input, default_outputs, output, start_index, end_index, i, min_periods); - - // set the mask - cudf::bitmask_type result_mask{__ballot_sync(active_threads, output_is_valid)}; - - // only one thread writes the mask - if (0 == threadIdx.x % cudf::detail::warp_size) { - output.set_mask_word(cudf::word_index(i), result_mask); - warp_valid_count += __popc(result_mask); - } - - // process next element - i += stride; - active_threads = __ballot_sync(active_threads, i < input.size()); - } - - // sum the valid counts across the whole block - size_type block_valid_count = - cudf::detail::single_lane_block_sum_reduce(warp_valid_count); - - if (threadIdx.x == 0) { atomicAdd(output_valid_count, block_valid_count); } -} - -template __launch_bounds__(block_size) __global__ @@ -440,10 +762,9 @@ __launch_bounds__(block_size) __global__ column_device_view default_outputs, mutable_column_device_view output, size_type* __restrict__ output_valid_count, + DeviceRollingOperator device_operator, PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - agg_op device_agg_op) + FollowingWindowIterator following_window_begin) { size_type i = blockIdx.x * block_size + threadIdx.x; size_type stride = block_size * gridDim.x; @@ -467,8 +788,8 @@ __launch_bounds__(block_size) __global__ // for dynamic and static sizes. volatile bool output_is_valid = false; - output_is_valid = process_rolling_window( - input, default_outputs, output, start_index, end_index, i, min_periods, device_agg_op); + output_is_valid = device_operator.template operator()( + input, default_outputs, output, start_index, end_index, i); // set the mask cudf::bitmask_type result_mask{__ballot_sync(active_threads, output_is_valid)}; @@ -491,726 +812,108 @@ __launch_bounds__(block_size) __global__ if (threadIdx.x == 0) { atomicAdd(output_valid_count, block_valid_count); } } +/** + * @brief Type/aggregation dispatched functor for launching the gpu rolling window + * kernel. + */ template struct rolling_window_launcher { - template - size_type kernel_launcher(column_view const& input, - column_view const& default_outputs, - mutable_column_view& output, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - rolling_aggregation const& agg, - rmm::cuda_stream_view stream) - { - using Type = device_storage_type_t; - using OutType = device_storage_type_t>; - - constexpr cudf::size_type block_size = 256; - cudf::detail::grid_1d grid(input.size(), block_size); - - auto input_device_view = column_device_view::create(input, stream); - auto output_device_view = mutable_column_device_view::create(output, stream); - auto default_outputs_device_view = column_device_view::create(default_outputs, stream); - - rmm::device_scalar device_valid_count{0, stream}; - - if (input.has_nulls()) { - gpu_rolling - <<>>(*input_device_view, - *default_outputs_device_view, - *output_device_view, - device_valid_count.data(), - preceding_window_begin, - following_window_begin, - min_periods); - } else { - gpu_rolling - <<>>(*input_device_view, - *default_outputs_device_view, - *output_device_view, - device_valid_count.data(), - preceding_window_begin, - following_window_begin, - min_periods); - } - - size_type valid_count = device_valid_count.value(stream); - - // check the stream for debugging - CHECK_CUDA(stream.value()); - - return valid_count; - } - - template - size_type kernel_launcher(column_view const& input, - column_view const& default_outputs, - mutable_column_view& output, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - rolling_aggregation const& agg, - agg_op const& device_agg_op, - rmm::cuda_stream_view stream) - { - using Type = device_storage_type_t; - using OutType = device_storage_type_t>; - - constexpr cudf::size_type block_size = 256; - cudf::detail::grid_1d grid(input.size(), block_size); - - auto input_device_view = column_device_view::create(input, stream); - auto output_device_view = mutable_column_device_view::create(output, stream); - auto default_outputs_device_view = column_device_view::create(default_outputs, stream); - - rmm::device_scalar device_valid_count{0, stream}; - - if (input.has_nulls()) { - gpu_rolling - <<>>(*input_device_view, - *default_outputs_device_view, - *output_device_view, - device_valid_count.data(), - preceding_window_begin, - following_window_begin, - min_periods, - device_agg_op); - } else { - gpu_rolling - <<>>(*input_device_view, - *default_outputs_device_view, - *output_device_view, - device_valid_count.data(), - preceding_window_begin, - following_window_begin, - min_periods, - device_agg_op); - } - - size_type valid_count = device_valid_count.value(stream); - - // check the stream for debugging - CHECK_CUDA(stream.value()); - - return valid_count; - } - - // This launch is only for fixed width columns with valid aggregation option - // numeric: All - // timestamp: MIN, MAX, COUNT_VALID, COUNT_ALL, ROW_NUMBER - // string, dictionary, list : COUNT_VALID, COUNT_ALL, ROW_NUMBER - template - std::enable_if_t() and - !cudf::detail::is_rolling_string_specialization(), - std::unique_ptr> - launch(column_view const& input, - column_view const& default_outputs, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - rolling_aggregation const& agg, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - auto output = make_fixed_width_column( - target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr); - - cudf::mutable_column_view output_view = output->mutable_view(); - auto valid_count = - kernel_launcher( - input, - default_outputs, - output_view, - preceding_window_begin, - following_window_begin, - min_periods, - agg, - stream); - - output->set_null_count(output->size() - valid_count); - - return output; - } - - // This launch is only for string specializations - // string: MIN, MAX - template - std::enable_if_t(), - std::unique_ptr> - launch(column_view const& input, - column_view const& default_outputs, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - rolling_aggregation const& agg, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - auto output = make_numeric_column(cudf::data_type{cudf::type_to_id()}, - input.size(), - cudf::mask_state::UNINITIALIZED, - stream, - mr); - - cudf::mutable_column_view output_view = output->mutable_view(); - - // Passing the agg_op and aggregation::Kind as constant to group them in pair, else it - // evolves to error when try to use agg_op as compiler tries different combinations - if (op == aggregation::MIN) { - kernel_launcher(input, - default_outputs, - output_view, - preceding_window_begin, - following_window_begin, - min_periods, - agg, - stream); - } else if (op == aggregation::MAX) { - kernel_launcher(input, - default_outputs, - output_view, - preceding_window_begin, - following_window_begin, - min_periods, - agg, - stream); - } else { - CUDF_FAIL("MIN and MAX are the only supported aggregation types for string columns"); - } - - // The rows that represent null elements will be having negative values in gather map, - // and that's why nullify_out_of_bounds/ignore_out_of_bounds is true. - auto output_table = detail::gather(table_view{{input}}, - output->view(), - cudf::out_of_bounds_policy::NULLIFY, - detail::negative_index_policy::NOT_ALLOWED, - stream, - mr); - return std::make_unique(std::move(output_table->get_column(0))); - } - - // Deals with invalid column and/or aggregation options - template - std::enable_if_t() and - !cudf::detail::is_rolling_string_specialization(), - std::unique_ptr> - launch(column_view const& input, - column_view const& default_outputs, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - rolling_aggregation const& agg, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - CUDF_FAIL("Aggregation operator and/or input type combination is invalid"); - } - - template - std::enable_if_t() and - (op == aggregation::LEAD || op == aggregation::LAG), - std::unique_ptr> - launch(column_view const& input, - column_view const& default_outputs, - PrecedingWindowIterator preceding, - FollowingWindowIterator following, - size_type min_periods, - rolling_aggregation const& agg, - agg_op const& device_agg_op, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - auto output = make_fixed_width_column( - target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr); - - cudf::mutable_column_view output_view = output->mutable_view(); - auto valid_count = - kernel_launcher( - input, - default_outputs, - output_view, - preceding, - following, - min_periods, - agg, - device_agg_op, - stream); - - output->set_null_count(output->size() - valid_count); - - return output; - } - template - std::enable_if_t::type::is_supported(), std::unique_ptr> operator()(column_view const& input, column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, - size_type min_periods, + int min_periods, rolling_aggregation const& agg, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - CUDF_EXPECTS(default_outputs.is_empty(), - "Only LEAD/LAG window functions support default values."); + auto const output_type = target_type(input.type(), op); + auto device_operator = create_rolling_operator{}(min_periods, agg); - return launch::type, - op, - PrecedingWindowIterator, - FollowingWindowIterator>(input, - default_outputs, - preceding_window_begin, - following_window_begin, - min_periods, - agg, - stream, - mr); - } + auto output = + make_fixed_width_column(output_type, input.size(), mask_state::UNINITIALIZED, stream, mr); - // This variant is just to handle mean - template - std::enable_if_t<(op == aggregation::MEAN), std::unique_ptr> operator()( - column_view const& input, - column_view const& default_outputs, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - rolling_aggregation const& agg, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - return launch( - input, - default_outputs, - preceding_window_begin, - following_window_begin, - min_periods, - agg, - stream, - mr); - } + cudf::mutable_column_view output_view = output->mutable_view(); - template - std::enable_if_t() && - (op == aggregation::LEAD || op == aggregation::LAG), - std::unique_ptr> - operator()(column_view const& input, - column_view const& default_outputs, - PrecedingWindowIterator preceding_window_begin, - FollowingWindowIterator following_window_begin, - size_type min_periods, - rolling_aggregation const& agg, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - return launch( - input, - default_outputs, - preceding_window_begin, - following_window_begin, - min_periods, - agg, - cudf::DeviceLeadLag{dynamic_cast(agg).row_offset}, - stream, - mr); + size_type valid_count{0}; + { + using Type = device_storage_type_t; + using OutType = device_storage_type_t>; + + constexpr cudf::size_type block_size = 256; + cudf::detail::grid_1d grid(input.size(), block_size); + + auto input_device_view = column_device_view::create(input, stream); + auto output_device_view = mutable_column_device_view::create(output_view, stream); + auto default_outputs_device_view = column_device_view::create(default_outputs, stream); + + rmm::device_scalar device_valid_count{0, stream}; + + if (input.has_nulls()) { + gpu_rolling + <<>>(*input_device_view, + *default_outputs_device_view, + *output_device_view, + device_valid_count.data(), + device_operator, + preceding_window_begin, + following_window_begin); + } else { + gpu_rolling + <<>>(*input_device_view, + *default_outputs_device_view, + *output_device_view, + device_valid_count.data(), + device_operator, + preceding_window_begin, + following_window_begin); + } + + valid_count = device_valid_count.value(stream); + + // check the stream for debugging + CHECK_CUDA(stream.value()); + } + + output->set_null_count(output->size() - valid_count); + + return output; } template - std::enable_if_t() && - (op == aggregation::LEAD || op == aggregation::LAG), + std::enable_if_t::type::is_supported(), std::unique_ptr> operator()(column_view const& input, column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, FollowingWindowIterator following_window_begin, - size_type min_periods, + int min_periods, rolling_aggregation const& agg, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return cudf::detail:: - compute_lead_lag_for_nested( - input, - default_outputs, - preceding_window_begin, - following_window_begin, - dynamic_cast(agg).row_offset, - stream, - mr); - } - - /** - * @brief Creates the offsets child of the result of the `COLLECT_LIST` window aggregation - * - * Given the input column, the preceding/following window bounds, and `min_periods`, - * the sizes of each list row may be computed. These values can then be used to - * calculate the offsets for the result of `COLLECT_LIST`. - * - * Note: If `min_periods` exceeds the number of observations for a window, the size - * is set to `0` (since the result is `null`). - */ - template - std::unique_ptr create_collect_offsets(size_type input_size, - PrecedingIter preceding_begin, - FollowingIter following_begin, - size_type min_periods, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - // Materialize offsets column. - auto static constexpr size_data_type = data_type{type_to_id()}; - auto sizes = - make_fixed_width_column(size_data_type, input_size, mask_state::UNALLOCATED, stream, mr); - auto mutable_sizes = sizes->mutable_view(); - - // Consider the following preceding/following values: - // preceding = [1,2,2,2,2] - // following = [1,1,1,1,0] - // The sum of the vectors should yield the window sizes: - // prec + foll = [2,3,3,3,2] - // - // If min_periods=2, all rows have at least `min_periods` observations. - // But if min_periods=3, rows at indices 0 and 4 have too few observations, and must return - // null. The sizes at these positions must be 0, i.e. - // prec + foll = [0,3,3,3,0] - thrust::transform(rmm::exec_policy(stream), - preceding_begin, - preceding_begin + input_size, - following_begin, - mutable_sizes.begin(), - [min_periods] __device__(auto preceding, auto following) { - return (preceding + following) < min_periods ? 0 : (preceding + following); - }); - - // Convert `sizes` to an offsets column, via inclusive_scan(): - return strings::detail::make_offsets_child_column( - sizes->view().begin(), sizes->view().end(), stream, mr); - } - - /** - * @brief Generate mapping of each row in the COLLECT_LIST result's child column - * to the index of the row it belongs to. - * - * If - * input col == [A,B,C,D,E] - * and preceding == [1,2,2,2,2], - * and following == [1,1,1,1,0], - * then, - * collect result == [ [A,B], [A,B,C], [B,C,D], [C,D,E], [D,E] ] - * i.e. result offset column == [0,2,5,8,11,13], - * and result child column == [A,B,A,B,C,B,C,D,C,D,E,D,E]. - * Mapping back to `input` == [0,1,0,1,2,1,2,3,2,3,4,3,4] - */ - std::unique_ptr get_list_child_to_list_row_mapping(cudf::column_view const& offsets, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - auto static constexpr size_data_type = data_type{type_to_id()}; - - // First, reduce offsets column by key, to identify the number of times - // an offset appears. - // Next, scatter the count for each offset (except the first and last), - // into a column of N `0`s, where N == number of child rows. - // For the example above: - // offsets == [0, 2, 5, 8, 11, 13] - // scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0] - // - // If the above example had an empty list row at index 2, - // the same columns would look as follows: - // offsets == [0, 2, 5, 5, 8, 11, 13] - // scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0] - // - // Note: To correctly handle null list rows at the beginning of - // the output column, care must be taken to skip the first `0` - // in the offsets column, when running `reduce_by_key()`. - // This accounts for the `0` added by default to the offsets - // column, marking the beginning of the column. - - auto const num_child_rows{ - cudf::detail::get_value(offsets, offsets.size() - 1, stream)}; - - auto scatter_values = - make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr); - auto scatter_keys = - make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr); - auto reduced_by_key = - thrust::reduce_by_key(rmm::exec_policy(stream), - offsets.template begin() + 1, // Skip first 0 in offsets. - offsets.template end(), - thrust::make_constant_iterator(1), - scatter_keys->mutable_view().template begin(), - scatter_values->mutable_view().template begin()); - auto scatter_values_end = reduced_by_key.second; - auto scatter_output = - make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr); - thrust::fill_n(rmm::exec_policy(stream), - scatter_output->mutable_view().template begin(), - num_child_rows, - 0); // [0,0,0,...0] - thrust::scatter( - rmm::exec_policy(stream), - scatter_values->mutable_view().template begin(), - scatter_values_end, - scatter_keys->view().template begin(), - scatter_output->mutable_view().template begin()); // [0,0,1,0,0,1,...] - - // Next, generate mapping with inclusive_scan() on scatter() result. - // For the example above: - // scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0] - // inclusive_scan == [0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4] - // - // For the case with an empty list at index 3: - // scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0] - // inclusive_scan == [0, 0, 1, 1, 1, 3, 3, 3, 4, 4, 4, 5, 5] - auto per_row_mapping = - make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr); - thrust::inclusive_scan(rmm::exec_policy(stream), - scatter_output->view().template begin(), - scatter_output->view().template end(), - per_row_mapping->mutable_view().template begin()); - return per_row_mapping; - } - - /** - * @brief Create gather map to generate the child column of the result of - * the `COLLECT_LIST` window aggregation. - */ - template - std::unique_ptr create_collect_gather_map(column_view const& child_offsets, - column_view const& per_row_mapping, - PrecedingIter preceding_iter, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - auto gather_map = make_fixed_width_column(data_type{type_to_id()}, - per_row_mapping.size(), - mask_state::UNALLOCATED, - stream, - mr); - thrust::transform( - rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(per_row_mapping.size()), - gather_map->mutable_view().template begin(), - [d_offsets = - child_offsets.template begin(), // E.g. [0, 2, 5, 8, 11, 13] - d_groups = - per_row_mapping.template begin(), // E.g. [0,0, 1,1,1, 2,2,2, 3,3,3, 4,4] - d_prev = preceding_iter] __device__(auto i) { - auto group = d_groups[i]; - auto group_start_offset = d_offsets[group]; - auto relative_index = i - group_start_offset; - - return (group - d_prev[group] + 1) + relative_index; - }); - return gather_map; - } - - /** - * @brief Count null entries in result of COLLECT_LIST. - */ - size_type count_child_nulls(column_view const& input, - std::unique_ptr const& gather_map, - rmm::cuda_stream_view stream) - { - auto input_device_view = column_device_view::create(input, stream); - - auto input_row_is_null = [d_input = *input_device_view] __device__(auto i) { - return d_input.is_null_nocheck(i); - }; - - return thrust::count_if(rmm::exec_policy(stream), - gather_map->view().template begin(), - gather_map->view().template end(), - input_row_is_null); - } - - /** - * @brief Purge entries for null inputs from gather_map, and adjust offsets. - */ - std::pair, std::unique_ptr> purge_null_entries( - column_view const& input, - column_view const& gather_map, - column_view const& offsets, - size_type num_child_nulls, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - auto input_device_view = column_device_view::create(input, stream); - - auto input_row_not_null = [d_input = *input_device_view] __device__(auto i) { - return d_input.is_valid_nocheck(i); - }; - - // Purge entries in gather_map that correspond to null input. - auto new_gather_map = make_fixed_width_column(data_type{type_to_id()}, - gather_map.size() - num_child_nulls, - mask_state::UNALLOCATED, - stream, - mr); - thrust::copy_if(rmm::exec_policy(stream), - gather_map.template begin(), - gather_map.template end(), - new_gather_map->mutable_view().template begin(), - input_row_not_null); - - // Recalculate offsets after null entries are purged. - auto new_sizes = make_fixed_width_column( - data_type{type_to_id()}, input.size(), mask_state::UNALLOCATED, stream, mr); - - thrust::transform(rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(input.size()), - new_sizes->mutable_view().template begin(), - [d_gather_map = gather_map.template begin(), - d_old_offsets = offsets.template begin(), - input_row_not_null] __device__(auto i) { - return thrust::count_if(thrust::seq, - d_gather_map + d_old_offsets[i], - d_gather_map + d_old_offsets[i + 1], - input_row_not_null); - }); - - auto new_offsets = - strings::detail::make_offsets_child_column(new_sizes->view().template begin(), - new_sizes->view().template end(), - stream, - mr); - - return std::make_pair, std::unique_ptr>( - std::move(new_gather_map), std::move(new_offsets)); - } - - template - std::enable_if_t<(op == aggregation::COLLECT_LIST), std::unique_ptr> operator()( - column_view const& input, - column_view const& default_outputs, - PrecedingIter preceding_begin_raw, - FollowingIter following_begin_raw, - size_type min_periods, - rolling_aggregation const& agg, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - { - CUDF_EXPECTS(default_outputs.is_empty(), - "COLLECT_LIST window function does not support default values."); - - if (input.is_empty()) return empty_like(input); - - // Fix up preceding/following iterators to respect column boundaries, - // similar to gpu_rolling(). - // `rolling_window()` does not fix up preceding/following so as not to read past - // column boundaries. - // `grouped_rolling_window()` and `time_range_based_grouped_rolling_window() do. - auto preceding_begin = thrust::make_transform_iterator( - thrust::make_counting_iterator(0), [preceding_begin_raw] __device__(auto i) { - return thrust::min(preceding_begin_raw[i], i + 1); - }); - auto following_begin = thrust::make_transform_iterator( - thrust::make_counting_iterator(0), - [following_begin_raw, size = input.size()] __device__(auto i) { - return thrust::min(following_begin_raw[i], size - i - 1); - }); - - // Materialize collect list's offsets. - auto offsets = create_collect_offsets( - input.size(), preceding_begin, following_begin, min_periods, stream, mr); - - // Map each element of the collect() result's child column - // to the index where it appears in the input. - auto per_row_mapping = get_list_child_to_list_row_mapping(offsets->view(), stream, mr); - - // Generate gather map to produce the collect() result's child column. - auto gather_map = create_collect_gather_map( - offsets->view(), per_row_mapping->view(), preceding_begin, stream, mr); - - // If gather_map collects null elements, and null_policy == EXCLUDE, - // those elements must be filtered out, and offsets recomputed. - auto null_handling = dynamic_cast(agg)._null_handling; - if (null_handling == null_policy::EXCLUDE && input.has_nulls()) { - auto num_child_nulls = count_child_nulls(input, gather_map, stream); - if (num_child_nulls != 0) { - std::tie(gather_map, offsets) = - purge_null_entries(input, *gather_map, *offsets, num_child_nulls, stream, mr); - } - } - - // gather(), to construct child column. - auto gather_output = - cudf::gather(table_view{std::vector{input}}, gather_map->view()); - - rmm::device_buffer null_mask; - size_type null_count; - std::tie(null_mask, null_count) = valid_if( - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(input.size()), - [preceding_begin, following_begin, min_periods] __device__(auto i) { - return (preceding_begin[i] + following_begin[i]) >= min_periods; - }, - stream, - mr); - - return make_lists_column(input.size(), - std::move(offsets), - std::move(gather_output->release()[0]), - null_count, - std::move(null_mask), - stream, - mr); + CUDF_FAIL("Invalid aggregation type/pair"); } }; +/** + * @brief Functor for performing the high level rolling logic. + * + * This does 3 basic things: + * + * - It calls the preprocess step on incoming aggregation/type pairs + * - It calls the aggregation-dispatched gpu-rolling operation + * - It calls the final postprocess step + */ struct dispatch_rolling { - template + template std::unique_ptr operator()(column_view const& input, column_view const& default_outputs, PrecedingWindowIterator preceding_window_begin, @@ -1220,16 +923,40 @@ struct dispatch_rolling { rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return aggregation_dispatcher(agg.kind, - rolling_window_launcher{}, - input, - default_outputs, - preceding_window_begin, - following_window_begin, - min_periods, - agg, - stream, - mr); + // do any preprocessing of aggregations (eg, MIN -> ARGMIN, COLLECT_LIST -> nothing) + rolling_aggregation_preprocessor preprocessor; + auto preprocessed_aggs = agg.get_simple_aggregations(input.type(), preprocessor); + CUDF_EXPECTS(preprocessed_aggs.size() <= 1, + "Encountered a non-trivial rolling aggregation result"); + + // perform the rolling window if we produced an aggregation to use + auto intermediate = preprocessed_aggs.size() > 0 + ? aggregation_dispatcher( + dynamic_cast(*preprocessed_aggs[0]).kind, + rolling_window_launcher{}, + input, + default_outputs, + preceding_window_begin, + following_window_begin, + min_periods, + dynamic_cast(*preprocessed_aggs[0]), + stream, + mr) + : nullptr; + + // finalize. + auto const result_type = target_type(input.type(), agg.kind); + rolling_aggregation_postprocessor postprocessor(input, + default_outputs, + result_type, + preceding_window_begin, + following_window_begin, + min_periods, + std::move(intermediate), + stream, + mr); + agg.finalize(postprocessor); + return postprocessor.get_result(); } }; @@ -1250,8 +977,9 @@ std::unique_ptr rolling_window_udf(column_view const& input, static_assert(warp_size == cudf::detail::size_in_bits(), "bitmask_type size does not match CUDA warp size"); - if (input.has_nulls()) + if (input.has_nulls()) { CUDF_FAIL("Currently the UDF version of rolling window does NOT support inputs with nulls."); + } min_periods = std::max(min_periods, 0); @@ -1333,14 +1061,20 @@ std::unique_ptr rolling_window(column_view const& input, static_assert(warp_size == cudf::detail::size_in_bits(), "bitmask_type size does not match CUDA warp size"); - if (input.is_empty()) return empty_like(input); + if (input.is_empty()) { return empty_like(input); } - if (cudf::is_dictionary(input.type())) + if (cudf::is_dictionary(input.type())) { CUDF_EXPECTS(agg.kind == aggregation::COUNT_ALL || agg.kind == aggregation::COUNT_VALID || agg.kind == aggregation::ROW_NUMBER || agg.kind == aggregation::MIN || agg.kind == aggregation::MAX || agg.kind == aggregation::LEAD || agg.kind == aggregation::LAG, "Invalid aggregation for dictionary column"); + } + + if (agg.kind != aggregation::LEAD && agg.kind != aggregation::LAG) { + CUDF_EXPECTS(default_outputs.is_empty(), + "Only LEAD/LAG window functions support default values."); + } min_periods = std::max(min_periods, 0); @@ -1358,12 +1092,14 @@ std::unique_ptr rolling_window(column_view const& input, agg, stream, mr); + if (!cudf::is_dictionary(input.type())) return output; // dictionary column post processing if (agg.kind == aggregation::COUNT_ALL || agg.kind == aggregation::COUNT_VALID || - agg.kind == aggregation::ROW_NUMBER) + agg.kind == aggregation::ROW_NUMBER) { return output; + } // output is new dictionary indices (including nulls) auto keys = std::make_unique(dictionary_column_view(input).keys(), stream, mr); diff --git a/cpp/src/rolling/rolling_detail.hpp b/cpp/src/rolling/rolling_detail.hpp index 18bd0ea2217..bd64cc39f47 100644 --- a/cpp/src/rolling/rolling_detail.hpp +++ b/cpp/src/rolling/rolling_detail.hpp @@ -25,65 +25,6 @@ namespace cudf { // helper functions - used in the rolling window implementation and tests namespace detail { -// return true the aggregation is valid for the specified ColumnType -// valid aggregations may still be further specialized (eg, is_string_specialized) -template -static constexpr bool is_rolling_supported() -{ - if (!cudf::detail::is_valid_aggregation()) { - return false; - } else if (cudf::is_numeric() or cudf::is_duration()) { - constexpr bool is_comparable_countable_op = std::is_same::value or - std::is_same::value or - std::is_same::value; - - constexpr bool is_operation_supported = - (op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or - (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::MEAN) or (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or - (op == aggregation::LAG) or (op == aggregation::COLLECT_LIST); - - constexpr bool is_valid_numeric_agg = - (cudf::is_numeric() or cudf::is_duration() or - is_comparable_countable_op) and - is_operation_supported; - - return is_valid_numeric_agg; - - } else if (cudf::is_timestamp()) { - return (op == aggregation::MIN) or (op == aggregation::MAX) or - (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or - (op == aggregation::LAG) or (op == aggregation::COLLECT_LIST); - } else if (cudf::is_fixed_point()) { - return (op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or - (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or - (op == aggregation::LAG) or (op == aggregation::COLLECT_LIST); - } else if (std::is_same()) { - return (op == aggregation::MIN) or (op == aggregation::MAX) or - (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER) or (op == aggregation::COLLECT_LIST); - - } else if (std::is_same()) { - return (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or - (op == aggregation::ROW_NUMBER) or (op == aggregation::COLLECT_LIST); - } else if (std::is_same()) { - // TODO: Add support for COUNT_VALID, COUNT_ALL, ROW_NUMBER. - return op == aggregation::COLLECT_LIST; - } else { - return false; - } -} - -// return true if this Op is specialized for strings. -template -static constexpr bool is_rolling_string_specialization() -{ - return std::is_same::value and - ((aggregation::MIN == Op and std::is_same::value) or - (aggregation::MAX == Op and std::is_same::value)); -} // store functor template diff --git a/cpp/tests/rolling/grouped_rolling_test.cpp b/cpp/tests/rolling/grouped_rolling_test.cpp index 804fd715951..cb123114fd8 100644 --- a/cpp/tests/rolling/grouped_rolling_test.cpp +++ b/cpp/tests/rolling/grouped_rolling_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "rolling_test.hpp" + #include #include #include @@ -336,7 +338,7 @@ class GroupedRollingTest : public cudf::test::BaseFixture { cudf::aggregation::Kind k, typename OutputType, bool is_mean, - std::enable_if_t()>* = nullptr> + std::enable_if_t()>* = nullptr> std::unique_ptr create_reference_output(cudf::column_view const& input, std::vector const& group_offsets, size_type const& preceding_window, @@ -393,7 +395,7 @@ class GroupedRollingTest : public cudf::test::BaseFixture { cudf::aggregation::Kind k, typename OutputType, bool is_mean, - std::enable_if_t()>* = nullptr> + std::enable_if_t()>* = nullptr> std::unique_ptr create_reference_output(cudf::column_view const& input, std::vector const& group_offsets, size_type const& preceding_window_col, @@ -953,7 +955,7 @@ class GroupedTimeRangeRollingTest : public cudf::test::BaseFixture { cudf::aggregation::Kind k, typename OutputType, bool is_mean, - std::enable_if_t()>* = nullptr> + std::enable_if_t()>* = nullptr> std::unique_ptr create_reference_output(cudf::column_view const& timestamp_column, cudf::order const& timestamp_order, cudf::column_view const& input, @@ -1037,7 +1039,7 @@ class GroupedTimeRangeRollingTest : public cudf::test::BaseFixture { cudf::aggregation::Kind k, typename OutputType, bool is_mean, - std::enable_if_t()>* = nullptr> + std::enable_if_t()>* = nullptr> std::unique_ptr create_reference_output(cudf::column_view const& timestamp_column, cudf::order const& timestamp_order, cudf::column_view const& input, diff --git a/cpp/tests/rolling/rolling_test.cpp b/cpp/tests/rolling/rolling_test.cpp index c22acf6b022..a67e670acb7 100644 --- a/cpp/tests/rolling/rolling_test.cpp +++ b/cpp/tests/rolling/rolling_test.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "rolling_test.hpp" + #include #include #include @@ -298,7 +300,7 @@ class RollingTest : public cudf::test::BaseFixture { cudf::aggregation::Kind k, typename OutputType, bool is_mean, - std::enable_if_t()>* = nullptr> + std::enable_if_t()>* = nullptr> std::unique_ptr create_reference_output( cudf::column_view const& input, std::vector const& preceding_window_col, @@ -353,7 +355,7 @@ class RollingTest : public cudf::test::BaseFixture { cudf::aggregation::Kind k, typename OutputType, bool is_mean, - std::enable_if_t()>* = nullptr> + std::enable_if_t()>* = nullptr> std::unique_ptr create_reference_output( cudf::column_view const& input, std::vector const& preceding_window_col, diff --git a/cpp/tests/rolling/rolling_test.hpp b/cpp/tests/rolling/rolling_test.hpp new file mode 100644 index 00000000000..cca82b15826 --- /dev/null +++ b/cpp/tests/rolling/rolling_test.hpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +// return true if the aggregation is valid for the specified ColumnType +// valid aggregations may still be further specialized (eg, is_string_specialized) +template +static constexpr bool is_rolling_supported() +{ + using namespace cudf; + + if (!cudf::detail::is_valid_aggregation()) { + return false; + } else if (cudf::is_numeric() or cudf::is_duration()) { + constexpr bool is_comparable_countable_op = std::is_same::value or + std::is_same::value or + std::is_same::value; + + constexpr bool is_operation_supported = + (op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or + (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or + (op == aggregation::MEAN) or (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or + (op == aggregation::LAG) or (op == aggregation::COLLECT_LIST); + + constexpr bool is_valid_numeric_agg = + (cudf::is_numeric() or cudf::is_duration() or + is_comparable_countable_op) and + is_operation_supported; + + return is_valid_numeric_agg; + + } else if (cudf::is_timestamp()) { + return (op == aggregation::MIN) or (op == aggregation::MAX) or + (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or + (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or + (op == aggregation::LAG) or (op == aggregation::COLLECT_LIST); + } else if (cudf::is_fixed_point()) { + return (op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or + (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or + (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or + (op == aggregation::LAG) or (op == aggregation::COLLECT_LIST); + } else if (std::is_same()) { + return (op == aggregation::MIN) or (op == aggregation::MAX) or + (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or + (op == aggregation::ROW_NUMBER) or (op == aggregation::COLLECT_LIST); + + } else if (std::is_same()) { + return (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or + (op == aggregation::ROW_NUMBER) or (op == aggregation::COLLECT_LIST); + } else if (std::is_same()) { + // TODO: Add support for COUNT_VALID, COUNT_ALL, ROW_NUMBER. + return op == aggregation::COLLECT_LIST; + } else { + return false; + } +}