Skip to content

Commit

Permalink
Implement COLLECT rolling window aggregation (#7189)
Browse files Browse the repository at this point in the history
Closes #7133. 

This is an implementation of the `COLLECT` aggregation in the context of rolling window functions. This enables the collection of rows (of type `T`) within specified window boundaries into a list column (containing elements of type `T`). In this context, one list row would be generated per input row. E.g. Consider the following example:
```c++
auto input_col = fixed_width_column_wrapper<int32_t>{70, 71, 72, 73, 74};
```
Calling `rolling_window()` with `preceding=2`, `following=1`, `min_periods=1` produces the following:
```c++
auto output_col = cudf::rolling_window(input_col, 2, 1, 1, collect_aggr);
            // == [ [70,71], [70,71,72], [71,72,73], [72,73,74], [73,74] ]
```
`COLLECT` is supported with `rolling_window()`, `grouped_rolling_window()`, and `grouped_time_range_rolling_window()`, across primitive types and arbitrarily nested lists and structs.

`min_periods` is also honoured:  If the number of observations is fewer than min_periods, the resulting list row is null.

Authors:
  - MithunR (@mythrocks)

Approvers:
  - Keith Kraus (@kkraus14)
  - Vukasin Milovanovic (@vuule)
  - Ram (Ramakrishna Prabhu) (@rgsl888prabhu)

URL: #7189
  • Loading branch information
mythrocks authored Jan 30, 2021
1 parent 019d7cc commit 14b0900
Show file tree
Hide file tree
Showing 6 changed files with 1,014 additions and 33 deletions.
25 changes: 2 additions & 23 deletions cpp/include/cudf/lists/detail/scatter.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/lists/detail/utilities.cuh>
#include <cudf/lists/list_device_view.cuh>
#include <cudf/null_mask.hpp>
#include <cudf/strings/detail/utilities.cuh>
Expand Down Expand Up @@ -160,28 +161,6 @@ rmm::device_uvector<unbound_list_view> list_vector_from_column(
return vector;
}

/**
* @brief Fetch the number of rows in a lists column's child given its offsets column.
*
* @param list_offsets Offsets child of a lists column
* @param stream The cuda-stream to synchronize on, when reading from device memory
* @return cudf::size_type The last element in the list_offsets column, indicating
* the number of rows in the lists-column's child.
*/
cudf::size_type get_num_child_rows(cudf::column_view const& list_offsets,
rmm::cuda_stream_view stream)
{
// Number of rows in child-column == last offset value.
cudf::size_type num_child_rows{};
CUDA_TRY(cudaMemcpyAsync(&num_child_rows,
list_offsets.data<cudf::size_type>() + list_offsets.size() - 1,
sizeof(cudf::size_type),
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
return num_child_rows;
}

/**
* @brief Constructs null mask for a scattered list's child column
*
Expand Down
46 changes: 46 additions & 0 deletions cpp/include/cudf/lists/detail/utilities.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/column/column_view.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace detail {

/**
* @brief Fetch the number of rows in a lists column's child given its offsets column.
*
* @param[in] list_offsets Offsets child of a lists column
* @param[in] stream The cuda-stream to synchronize on, when reading from device memory
* @return cudf::size_type The number of child rows in the lists column
*/
static cudf::size_type get_num_child_rows(cudf::column_view const& list_offsets,
rmm::cuda_stream_view stream)
{
// Number of rows in child-column == last offset value.
cudf::size_type num_child_rows{};
CUDA_TRY(cudaMemcpyAsync(&num_child_rows,
list_offsets.data<cudf::size_type>() + list_offsets.size() - 1,
sizeof(cudf::size_type),
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
return num_child_rows;
}

} // namespace detail
} // namespace cudf
243 changes: 240 additions & 3 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,9 +33,12 @@
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
#include <cudf/lists/detail/utilities.cuh>
#include <cudf/rolling.hpp>
#include <cudf/strings/detail/utilities.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/error.hpp>
Expand Down Expand Up @@ -312,7 +315,7 @@ template <typename InputType,
std::enable_if_t<!std::is_same<InputType, cudf::string_view>::value and
!(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL ||
op == aggregation::ROW_NUMBER || op == aggregation::LEAD ||
op == aggregation::LAG)>* = nullptr>
op == aggregation::LAG || op == aggregation::COLLECT)>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
Expand Down Expand Up @@ -810,7 +813,8 @@ struct rolling_window_launcher {
template <aggregation::Kind op,
typename PrecedingWindowIterator,
typename FollowingWindowIterator>
std::enable_if_t<!(op == aggregation::MEAN || op == aggregation::LEAD || op == aggregation::LAG),
std::enable_if_t<!(op == aggregation::MEAN || op == aggregation::LEAD || op == aggregation::LAG ||
op == aggregation::COLLECT),
std::unique_ptr<column>>
operator()(column_view const& input,
column_view const& default_outputs,
Expand Down Expand Up @@ -891,6 +895,239 @@ struct rolling_window_launcher {
stream,
mr);
}

/**
* @brief Creates the offsets child of the result of the `COLLECT` window aggregation
*
* Given the input column, the preceding/following window bounds, and `min_periods`,
* the sizes of each list row may be computed. These values can then be used to
* calculate the offsets for the result of `COLLECT`.
*
* Note: If `min_periods` exceeds the number of observations for a window, the size
* is set to `0` (since the result is `null`).
*/
template <typename PrecedingIter, typename FollowingIter>
std::unique_ptr<column> create_collect_offsets(size_type input_size,
PrecedingIter preceding_begin,
FollowingIter following_begin,
size_type min_periods,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Materialize offsets column.
auto static constexpr size_data_type = data_type{type_to_id<size_type>()};
auto sizes =
make_fixed_width_column(size_data_type, input_size, mask_state::UNALLOCATED, stream, mr);
auto mutable_sizes = sizes->mutable_view();

// Consider the following preceding/following values:
// preceding = [1,2,2,2,2]
// following = [1,1,1,1,0]
// The sum of the vectors should yield the window sizes:
// prec + foll = [2,3,3,3,2]
//
// If min_periods=2, all rows have at least `min_periods` observations.
// But if min_periods=3, rows at indices 0 and 4 have too few observations, and must return
// null. The sizes at these positions must be 0, i.e.
// prec + foll = [0,3,3,3,0]
thrust::transform(rmm::exec_policy(stream),
preceding_begin,
preceding_begin + input_size,
following_begin,
mutable_sizes.begin<size_type>(),
[min_periods] __device__(auto preceding, auto following) {
return (preceding + following) < min_periods ? 0 : (preceding + following);
});

// Convert `sizes` to an offsets column, via inclusive_scan():
return strings::detail::make_offsets_child_column(
sizes->view().begin<size_type>(), sizes->view().end<size_type>(), stream, mr);
}

/**
* @brief Generate mapping of each row in the COLLECT result's child column
* to the index of the row it belongs to.
*
* If
* input col == [A,B,C,D,E]
* and preceding == [1,2,2,2,2],
* and following == [1,1,1,1,0],
* then,
* collect result == [ [A,B], [A,B,C], [B,C,D], [C,D,E], [D,E] ]
* i.e. result offset column == [0,2,5,8,11,13],
* and result child column == [A,B,A,B,C,B,C,D,C,D,E,D,E].
* Mapping back to `input` == [0,1,0,1,2,1,2,3,2,3,4,3,4]
*/
std::unique_ptr<column> get_list_child_to_list_row_mapping(cudf::column_view const& offsets,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto static constexpr size_data_type = data_type{type_to_id<size_type>()};

// First, reduce offsets column by key, to identify the number of times
// an offset appears.
// Next, scatter the count for each offset (except the first and last),
// into a column of N `0`s, where N == number of child rows.
// For the example above:
// offsets == [0, 2, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
//
// If the above example had an empty list row at index 2,
// the same columns would look as follows:
// offsets == [0, 2, 5, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
//
// Note: To correctly handle null list rows at the beginning of
// the output column, care must be taken to skip the first `0`
// in the offsets column, when running `reduce_by_key()`.
// This accounts for the `0` added by default to the offsets
// column, marking the beginning of the column.

auto num_child_rows = get_num_child_rows(offsets, stream);

auto scatter_values =
make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr);
auto scatter_keys =
make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr);
auto reduced_by_key =
thrust::reduce_by_key(rmm::exec_policy(stream),
offsets.template begin<size_type>() + 1, // Skip first 0 in offsets.
offsets.template end<size_type>(),
thrust::make_constant_iterator<size_type>(1),
scatter_keys->mutable_view().template begin<size_type>(),
scatter_values->mutable_view().template begin<size_type>());
auto scatter_values_end = reduced_by_key.second;
auto scatter_output =
make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill_n(rmm::exec_policy(stream),
scatter_output->mutable_view().template begin<size_type>(),
num_child_rows,
0); // [0,0,0,...0]
thrust::scatter(
rmm::exec_policy(stream),
scatter_values->mutable_view().template begin<size_type>(),
scatter_values_end,
scatter_keys->view().template begin<size_type>(),
scatter_output->mutable_view().template begin<size_type>()); // [0,0,1,0,0,1,...]

// Next, generate mapping with inclusive_scan() on scatter() result.
// For the example above:
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4]
//
// For the case with an empty list at index 3:
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 3, 3, 3, 4, 4, 4, 5, 5]
auto per_row_mapping =
make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr);
thrust::inclusive_scan(rmm::exec_policy(stream),
scatter_output->view().template begin<size_type>(),
scatter_output->view().template end<size_type>(),
per_row_mapping->mutable_view().template begin<size_type>());
return per_row_mapping;
}

/**
* @brief Create gather map to generate the child column of the result of
* the `COLLECT` window aggregation.
*/
template <typename PrecedingIter>
std::unique_ptr<column> create_collect_gather_map(column_view const& child_offsets,
column_view const& per_row_mapping,
PrecedingIter preceding_iter,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto gather_map = make_fixed_width_column(data_type{type_to_id<size_type>()},
per_row_mapping.size(),
mask_state::UNALLOCATED,
stream,
mr);
thrust::transform(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(per_row_mapping.size()),
gather_map->mutable_view().template begin<size_type>(),
[d_offsets =
child_offsets.template begin<size_type>(), // E.g. [0, 2, 5, 8, 11, 13]
d_groups =
per_row_mapping.template begin<size_type>(), // E.g. [0,0, 1,1,1, 2,2,2, 3,3,3, 4,4]
d_prev = preceding_iter] __device__(auto i) {
auto group = d_groups[i];
auto group_start_offset = d_offsets[group];
auto relative_index = i - group_start_offset;

return (group - d_prev[group] + 1) + relative_index;
});
return gather_map;
}

template <aggregation::Kind op, typename PrecedingIter, typename FollowingIter>
std::enable_if_t<(op == aggregation::COLLECT), std::unique_ptr<column>> operator()(
column_view const& input,
column_view const& default_outputs,
PrecedingIter preceding_begin_raw,
FollowingIter following_begin_raw,
size_type min_periods,
std::unique_ptr<aggregation> const& agg,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(default_outputs.is_empty(),
"COLLECT window function does not support default values.");

if (input.is_empty()) return empty_like(input);

// Fix up preceding/following iterators to respect column boundaries,
// similar to gpu_rolling().
// `rolling_window()` does not fix up preceding/following so as not to read past
// column boundaries.
// `grouped_rolling_window()` and `time_range_based_grouped_rolling_window() do.
auto preceding_begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0), [preceding_begin_raw] __device__(auto i) {
return thrust::min(preceding_begin_raw[i], i + 1);
});
auto following_begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0),
[following_begin_raw, size = input.size()] __device__(auto i) {
return thrust::min(following_begin_raw[i], size - i - 1);
});

// Materialize collect list's offsets.
auto offsets = create_collect_offsets(
input.size(), preceding_begin, following_begin, min_periods, stream, mr);

// Map each element of the collect() result's child column
// to the index where it appears in the input.
auto per_row_mapping = get_list_child_to_list_row_mapping(offsets->view(), stream, mr);

// Generate gather map to produce the collect() result's child column.
auto gather_map = create_collect_gather_map(
offsets->view(), per_row_mapping->view(), preceding_begin, stream, mr);

// gather(), to construct child column.
auto gather_output =
cudf::gather(table_view{std::vector<column_view>{input}}, gather_map->view());

rmm::device_buffer null_mask;
size_type null_count;
std::tie(null_mask, null_count) = valid_if(
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(input.size()),
[preceding_begin, following_begin, min_periods] __device__(auto i) {
return (preceding_begin[i] + following_begin[i]) >= min_periods;
},
stream,
mr);

return make_lists_column(input.size(),
std::move(offsets),
std::move(gather_output->release()[0]),
null_count,
std::move(null_mask),
stream,
mr);
}
};

struct dispatch_rolling {
Expand Down
Loading

0 comments on commit 14b0900

Please sign in to comment.