Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] COLLECT aggregation for rolling windows #7133

Closed
mythrocks opened this issue Jan 13, 2021 · 4 comments · Fixed by #7189
Closed

[FEA] COLLECT aggregation for rolling windows #7133

mythrocks opened this issue Jan 13, 2021 · 4 comments · Fixed by #7189
Assignees
Labels
feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. proposal Change current process or code Python Affects Python cuDF API. Spark Functionality that helps Spark RAPIDS

Comments

@mythrocks
Copy link
Contributor

#5874 introduced the COLLECT aggregation to collect grouped rows into lists.

It would be good to support COLLECT in the context of rolling window functions. This would enable the caller to collect rows (of type T) within specified window boundaries into a list column (containing elements of type T). In this context, one list row would be generated per input row. E.g. Consider the following example:

auto input_col = fixed_width_column_wrapper<int32_t>{70, 71, 72, 73, 74};

Calling rolling_window() with preceding=2, following=1, min_periods=1 should produce the following:

auto output_col = cudf::rolling_window(input_col, 2, 1, 1, collect_aggr);
            // == [ [70,71], [70,71,72], [71,72,73], [72,73,74], [73,74] ]

Note how the first and last list rows are of size 2, because min_periods is 1, and there are only two observations each at the ends.

min_periods should be honoured as before: If the number of observations is fewer than min_periods, the resulting list row is null.

While the example above was for non-grouped input columns, the behaviour with grouped inputs will be very similar. All that changes is that the calculation of the window boundaries must respect the group boundaries:

auto input_grp_col  = fixed_width_column_wrapper<int32_t>{ 7,  7,  7,  7,  7,  8,  8,  8};
auto input_aggr_col = fixed_width_column_wrapper<int32_t>{70, 71, 72, 73, 74, 80, 81, 82};
auto output_col     = cudf::grouped_rolling_window(make_table(input_grp_col), 
                                                   input_aggr_col, 2, 1, collect_aggr);
      // == [ [70,71], [70,71,72], [71,72,73], [72,73,74], [73,74], [80,81], [80,81,82], [81,82] ];
@mythrocks mythrocks added feature request New feature or request Needs Triage Need team to review and classify labels Jan 13, 2021
@mythrocks mythrocks self-assigned this Jan 13, 2021
@harrism harrism added Python Affects Python cuDF API. libcudf Affects libcudf (C++/CUDA) code. Spark Functionality that helps Spark RAPIDS and removed Needs Triage Need team to review and classify labels Jan 13, 2021
@mythrocks
Copy link
Contributor Author

Proposed implementation

This is the first window aggregation to produce a nested, variable length result column. This should keep things interesting. What follows is one way to implement COLLECT for windows:

  1. Generate offsets for resultant lists column, from preceding and following.
  2. Generate gather-map, mapping the input rows to where in the output child column they appear.
  3. Generate output child column using cudf::gather() on input column with gather-map from previous step.
  4. Assemble result column from offsets and `child.

All the *rolling_window() functions bank on cudf::detail functions where the preceding/following bounds are exposed via iterators. The calculated bounds respect the column/group/time-range boundaries. In the example above, for preceding=2, following=1:

using int_col = fixed_width_column_wrapper<int32_t>;
using idx_col = fixed_width_column_wrapper<cudf::size_type>;
auto input_grp_col  = int_col{  7,  7,  7,  7,  7,  8,  8,  8 };
auto input_aggr_col = int_col{ 70, 71, 72, 73, 74, 80, 81, 82 };
//   preceding_col  = idx_col{  1,  2,  2,  2,  2,  1,  2,  2 };  // preceding = 2;
//   following_col  = idx_col{  1,  1,  1,  1,  0,  1,  1,  0 };  // following = 1;
      

Adding preceding and following should yield each list row's size. inclusive_scan() should yield offsets for the resultant list column. The last row yields the number of rows in the list child:

auto size_data_type = data_type{type_to_id<size_type>()};
auto sizes_col = cudf::binary_operation(preceding_col, following_col, binary_operator::ADD, size_data_type);
           // == {    2, 3, 3,  3,  2,  2,  3,  2 }
auto output_offsets = cudf::strings::detail::make_offsets_child_column(sizes->view().begin<size_type>(), sizes->view().end<size_type>());
           // == { 0, 2, 5, 8, 11, 13, 15, 18, 20 }

//   num_child_rows == 20;

Next, generate an index column mapping the list child rows to the index of the list row to which it belongs. (@harrism's advice on #6791 works well.)

// generate_group_mapping():
//  1. Scatter {1} to all offsets except `0` and `20`.
//  2. inclusive_scan() on the result.
auto per_row_group_mapping = generate_group_mapping(offsets);
// == { 0,0, 1,1,1, 2,2,2, 3,3,3, 4,4, 5,5, 6,6,6, 7,7 };

Using per_row_group_mapping, offsets, and preceding, we can generate a gather map by iterating from 0 to num_child_rows with the following lambda:

[per_row_group_mapping, offsets, preceding]
__device__ (auto i) {
    auto group = per_row_group_mapping[i];
    auto group_start_offset = offsets[group];
    auto relative_index = i - group_start_offset;
    return (group - preceding[group] + 1) + relative_index;
}
// ...
// auto gather_map = { 0,1, 0,1,2, 1,2,3, 2,3,4, 3,4, 5,6, 5,6,7, 6,7 }

The list child column can be assembled by calling cudf::gather() on the input column:

auto output_child = cudf::gather(input_aggr_col, ...);
//               == { 70,71, 70,71,72, 71,72,73, 72,73,74, 73,74, 80,81, 80,81,82, 81,82 };

output_offsets and output_child can be used to assemble the final result list column.

@mythrocks
Copy link
Contributor Author

For the record, I have this working in a private branch, outside of rolling.cu. I'll start trying to integrate this approach shortly. It looks like this will be a different code path from the other window aggregations, since the other ones assume a fixed-width, non-nested result.

@mythrocks
Copy link
Contributor Author

The algorithm has needed some refinement to support null and empty rows in the output. The example below will illustrate:

using int_col = fixed_width_column_wrapper<int32_t>;
using idx_col = fixed_width_column_wrapper<cudf::size_type>;
auto input_aggr_col = int_col{ 70, 71, 72, 73, 74 };
//   preceding_col  = idx_col{  0,  2,  2,  0,  2 }; 
//   following_col  = idx_col{  0,  1,  1,  0,  0 };  

Empty lists

If min_periods == 0, empty lists would be allowed in the output. The first row should produce an empty list. Continuing with the erstwhile method:

//  sizes_col   = { 0, 3, 3, 0, 2 };
//  offsets_col = { 0, 0, 3, 6, 6, 8 }; 
//  scatter_1   = { 0, 0, 0, 1, 0, 0, 1, 0 };
// incl_scan    = { 0, 0, 0, 1, 1, 1, 2, 2 }; 

The mapping produced by the inclusive_scan() is incorrect:

  1. The first 3 elements of the child should belong to the row at index 1, not 0.
  2. The last 2 elements should belong to the row at index 4, not 2.
    An empty (or null) row leads to offsets_col having repeated values. Instead of scattering 1, we should instead use reduce_by_key() to dedup and count the occurrences. It is the counts that must be scattered. For the case above:
//  offsets_col   = { 0, 0, 3, 6, 6, 8 }; 
//  reduce_by_key = { {0, 3, 6, 8},
//                    {1, 1, 2, 1} };
// scatter_counts = { 0, 0, 0, 1, 0, 0, 2, 0 };
// incl_scan      = { 1, 1, 1, 2, 2, 2, 4, 4 };

Notice that the reduce_by_key() must skip the first 0 in the offsets column (to account for default 0 added at the beginning). Otherwise, empty lists produced at the beginning of the collect() output column will not be handled correctly.

This fix has been incorporated in my private branch. A pull request is on the way.

@mythrocks
Copy link
Contributor Author

A pull request is on the way.

#7189

rapids-bot bot pushed a commit that referenced this issue Jan 30, 2021
Closes #7133. 

This is an implementation of the `COLLECT` aggregation in the context of rolling window functions. This enables the collection of rows (of type `T`) within specified window boundaries into a list column (containing elements of type `T`). In this context, one list row would be generated per input row. E.g. Consider the following example:
```c++
auto input_col = fixed_width_column_wrapper<int32_t>{70, 71, 72, 73, 74};
```
Calling `rolling_window()` with `preceding=2`, `following=1`, `min_periods=1` produces the following:
```c++
auto output_col = cudf::rolling_window(input_col, 2, 1, 1, collect_aggr);
            // == [ [70,71], [70,71,72], [71,72,73], [72,73,74], [73,74] ]
```
`COLLECT` is supported with `rolling_window()`, `grouped_rolling_window()`, and `grouped_time_range_rolling_window()`, across primitive types and arbitrarily nested lists and structs.

`min_periods` is also honoured:  If the number of observations is fewer than min_periods, the resulting list row is null.

Authors:
  - MithunR (@mythrocks)

Approvers:
  - Keith Kraus (@kkraus14)
  - Vukasin Milovanovic (@vuule)
  - Ram (Ramakrishna Prabhu) (@rgsl888prabhu)

URL: #7189
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. proposal Change current process or code Python Affects Python cuDF API. Spark Functionality that helps Spark RAPIDS
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants