Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement COLLECT rolling window aggregation #7189

Merged
merged 27 commits into from
Jan 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4229fd6
JUNCO: Working prototype:
mythrocks Jan 17, 2021
ca0f651
WIP: Got offsets.
mythrocks Jan 17, 2021
5b639c5
WIP: Got child/input mapping.
mythrocks Jan 19, 2021
3170c18
WIP: Got gather map working.
mythrocks Jan 19, 2021
f0a208e
Working.
mythrocks Jan 19, 2021
d8b834b
WIP: Added fixup for rolling_window() iterators.
mythrocks Jan 20, 2021
2568341
WIP: Added support for empty lists in result.
mythrocks Jan 20, 2021
db73208
WIP: Switch for_each_n() to transform()
mythrocks Jan 20, 2021
871a4ae
WIP: Support for min_periods checks.
mythrocks Jan 21, 2021
3fbaf3e
WIP: Clarify how empty lists are handled...
mythrocks Jan 21, 2021
219cb5d
WIP: Tests!
mythrocks Jan 21, 2021
507239c
WIP: Moved get_num_child_rows() to utilities
mythrocks Jan 22, 2021
095e911
Merge remote-tracking branch 'origin/branch-0.18' into collect_list
mythrocks Jan 22, 2021
aec5ae1
Code formatting.
mythrocks Jan 22, 2021
3aa029b
Moved collect_list_test to .cpp
mythrocks Jan 22, 2021
cf281f0
Const all the things. Added some test descriptions.
mythrocks Jan 23, 2021
06764ad
Added tests for collecting decimal columns
mythrocks Jan 26, 2021
f9e4188
Removed namespace directives, for review.
mythrocks Jan 26, 2021
f897c62
Streamlined null mask construction:
mythrocks Jan 26, 2021
3e0c392
Better documentation for COLLECT helper functions.
mythrocks Jan 26, 2021
04417ed
Update dates in Copyright
mythrocks Jan 26, 2021
93be98c
Using input column sizes instead of whole column
mythrocks Jan 26, 2021
76926d4
Fixed copyrights. Refactored null mask construction.
mythrocks Jan 26, 2021
4bcd852
Test for Input columns with nulls.
mythrocks Jan 27, 2021
6558ac9
Merge remote-tracking branch 'origin/branch-0.18' into collect_list
mythrocks Jan 28, 2021
95a3d49
More tests for nulled inputs.
mythrocks Jan 28, 2021
7115460
Merge remote-tracking branch 'origin/branch-0.18' into collect_list
mythrocks Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions cpp/include/cudf/lists/detail/scatter.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/lists/detail/utilities.cuh>
#include <cudf/lists/list_device_view.cuh>
#include <cudf/null_mask.hpp>
#include <cudf/strings/detail/utilities.cuh>
Expand Down Expand Up @@ -160,28 +161,6 @@ rmm::device_uvector<unbound_list_view> list_vector_from_column(
return vector;
}

/**
* @brief Fetch the number of rows in a lists column's child given its offsets column.
*
* @param list_offsets Offsets child of a lists column
* @param stream The cuda-stream to synchronize on, when reading from device memory
* @return cudf::size_type The last element in the list_offsets column, indicating
* the number of rows in the lists-column's child.
*/
cudf::size_type get_num_child_rows(cudf::column_view const& list_offsets,
rmm::cuda_stream_view stream)
{
// Number of rows in child-column == last offset value.
cudf::size_type num_child_rows{};
CUDA_TRY(cudaMemcpyAsync(&num_child_rows,
list_offsets.data<cudf::size_type>() + list_offsets.size() - 1,
sizeof(cudf::size_type),
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
return num_child_rows;
}

/**
* @brief Constructs null mask for a scattered list's child column
*
Expand Down
46 changes: 46 additions & 0 deletions cpp/include/cudf/lists/detail/utilities.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/column/column_view.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace detail {

/**
* @brief Fetch the number of rows in a lists column's child given its offsets column.
*
* @param[in] list_offsets Offsets child of a lists column
* @param[in] stream The cuda-stream to synchronize on, when reading from device memory
* @return cudf::size_type The number of child rows in the lists column
*/
static cudf::size_type get_num_child_rows(cudf::column_view const& list_offsets,
rmm::cuda_stream_view stream)
{
// Number of rows in child-column == last offset value.
cudf::size_type num_child_rows{};
CUDA_TRY(cudaMemcpyAsync(&num_child_rows,
list_offsets.data<cudf::size_type>() + list_offsets.size() - 1,
sizeof(cudf::size_type),
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
return num_child_rows;
}

} // namespace detail
} // namespace cudf
243 changes: 240 additions & 3 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,9 +33,12 @@
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
#include <cudf/lists/detail/utilities.cuh>
#include <cudf/rolling.hpp>
#include <cudf/strings/detail/utilities.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/error.hpp>
Expand Down Expand Up @@ -312,7 +315,7 @@ template <typename InputType,
std::enable_if_t<!std::is_same<InputType, cudf::string_view>::value and
!(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL ||
op == aggregation::ROW_NUMBER || op == aggregation::LEAD ||
op == aggregation::LAG)>* = nullptr>
op == aggregation::LAG || op == aggregation::COLLECT)>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
Expand Down Expand Up @@ -810,7 +813,8 @@ struct rolling_window_launcher {
template <aggregation::Kind op,
typename PrecedingWindowIterator,
typename FollowingWindowIterator>
std::enable_if_t<!(op == aggregation::MEAN || op == aggregation::LEAD || op == aggregation::LAG),
std::enable_if_t<!(op == aggregation::MEAN || op == aggregation::LEAD || op == aggregation::LAG ||
op == aggregation::COLLECT),
std::unique_ptr<column>>
operator()(column_view const& input,
column_view const& default_outputs,
Expand Down Expand Up @@ -891,6 +895,239 @@ struct rolling_window_launcher {
stream,
mr);
}

/**
* @brief Creates the offsets child of the result of the `COLLECT` window aggregation
*
* Given the input column, the preceding/following window bounds, and `min_periods`,
* the sizes of each list row may be computed. These values can then be used to
* calculate the offsets for the result of `COLLECT`.
*
* Note: If `min_periods` exceeds the number of observations for a window, the size
* is set to `0` (since the result is `null`).
*/
template <typename PrecedingIter, typename FollowingIter>
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<column> create_collect_offsets(size_type input_size,
PrecedingIter preceding_begin,
FollowingIter following_begin,
size_type min_periods,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Materialize offsets column.
auto static constexpr size_data_type = data_type{type_to_id<size_type>()};
auto sizes =
make_fixed_width_column(size_data_type, input_size, mask_state::UNALLOCATED, stream, mr);
auto mutable_sizes = sizes->mutable_view();

// Consider the following preceding/following values:
// preceding = [1,2,2,2,2]
// following = [1,1,1,1,0]
// The sum of the vectors should yield the window sizes:
// prec + foll = [2,3,3,3,2]
//
// If min_periods=2, all rows have at least `min_periods` observations.
// But if min_periods=3, rows at indices 0 and 4 have too few observations, and must return
// null. The sizes at these positions must be 0, i.e.
// prec + foll = [0,3,3,3,0]
thrust::transform(rmm::exec_policy(stream),
preceding_begin,
preceding_begin + input_size,
following_begin,
mutable_sizes.begin<size_type>(),
[min_periods] __device__(auto preceding, auto following) {
return (preceding + following) < min_periods ? 0 : (preceding + following);
});

// Convert `sizes` to an offsets column, via inclusive_scan():
return strings::detail::make_offsets_child_column(
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
sizes->view().begin<size_type>(), sizes->view().end<size_type>(), stream, mr);
}

/**
* @brief Generate mapping of each row in the COLLECT result's child column
* to the index of the row it belongs to.
*
* If
* input col == [A,B,C,D,E]
* and preceding == [1,2,2,2,2],
* and following == [1,1,1,1,0],
* then,
* collect result == [ [A,B], [A,B,C], [B,C,D], [C,D,E], [D,E] ]
* i.e. result offset column == [0,2,5,8,11,13],
* and result child column == [A,B,A,B,C,B,C,D,C,D,E,D,E].
* Mapping back to `input` == [0,1,0,1,2,1,2,3,2,3,4,3,4]
*/
std::unique_ptr<column> get_list_child_to_list_row_mapping(cudf::column_view const& offsets,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto static constexpr size_data_type = data_type{type_to_id<size_type>()};

// First, reduce offsets column by key, to identify the number of times
// an offset appears.
// Next, scatter the count for each offset (except the first and last),
// into a column of N `0`s, where N == number of child rows.
// For the example above:
// offsets == [0, 2, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
//
// If the above example had an empty list row at index 2,
// the same columns would look as follows:
// offsets == [0, 2, 5, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
//
// Note: To correctly handle null list rows at the beginning of
// the output column, care must be taken to skip the first `0`
// in the offsets column, when running `reduce_by_key()`.
// This accounts for the `0` added by default to the offsets
// column, marking the beginning of the column.

auto num_child_rows = get_num_child_rows(offsets, stream);

auto scatter_values =
make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr);
auto scatter_keys =
make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr);
auto reduced_by_key =
thrust::reduce_by_key(rmm::exec_policy(stream),
offsets.template begin<size_type>() + 1, // Skip first 0 in offsets.
offsets.template end<size_type>(),
thrust::make_constant_iterator<size_type>(1),
scatter_keys->mutable_view().template begin<size_type>(),
scatter_values->mutable_view().template begin<size_type>());
auto scatter_values_end = reduced_by_key.second;
auto scatter_output =
make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill_n(rmm::exec_policy(stream),
scatter_output->mutable_view().template begin<size_type>(),
num_child_rows,
0); // [0,0,0,...0]
thrust::scatter(
rmm::exec_policy(stream),
scatter_values->mutable_view().template begin<size_type>(),
scatter_values_end,
scatter_keys->view().template begin<size_type>(),
scatter_output->mutable_view().template begin<size_type>()); // [0,0,1,0,0,1,...]

// Next, generate mapping with inclusive_scan() on scatter() result.
// For the example above:
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4]
//
// For the case with an empty list at index 3:
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 3, 3, 3, 4, 4, 4, 5, 5]
auto per_row_mapping =
make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr);
thrust::inclusive_scan(rmm::exec_policy(stream),
scatter_output->view().template begin<size_type>(),
scatter_output->view().template end<size_type>(),
per_row_mapping->mutable_view().template begin<size_type>());
return per_row_mapping;
}

/**
* @brief Create gather map to generate the child column of the result of
* the `COLLECT` window aggregation.
*/
template <typename PrecedingIter>
std::unique_ptr<column> create_collect_gather_map(column_view const& child_offsets,
column_view const& per_row_mapping,
PrecedingIter preceding_iter,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto gather_map = make_fixed_width_column(data_type{type_to_id<size_type>()},
per_row_mapping.size(),
mask_state::UNALLOCATED,
stream,
mr);
thrust::transform(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(per_row_mapping.size()),
gather_map->mutable_view().template begin<size_type>(),
[d_offsets =
child_offsets.template begin<size_type>(), // E.g. [0, 2, 5, 8, 11, 13]
d_groups =
per_row_mapping.template begin<size_type>(), // E.g. [0,0, 1,1,1, 2,2,2, 3,3,3, 4,4]
d_prev = preceding_iter] __device__(auto i) {
auto group = d_groups[i];
auto group_start_offset = d_offsets[group];
auto relative_index = i - group_start_offset;

return (group - d_prev[group] + 1) + relative_index;
});
return gather_map;
}

template <aggregation::Kind op, typename PrecedingIter, typename FollowingIter>
std::enable_if_t<(op == aggregation::COLLECT), std::unique_ptr<column>> operator()(
column_view const& input,
column_view const& default_outputs,
PrecedingIter preceding_begin_raw,
FollowingIter following_begin_raw,
size_type min_periods,
std::unique_ptr<aggregation> const& agg,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(default_outputs.is_empty(),
"COLLECT window function does not support default values.");

if (input.is_empty()) return empty_like(input);

// Fix up preceding/following iterators to respect column boundaries,
// similar to gpu_rolling().
// `rolling_window()` does not fix up preceding/following so as not to read past
// column boundaries.
// `grouped_rolling_window()` and `time_range_based_grouped_rolling_window() do.
auto preceding_begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0), [preceding_begin_raw] __device__(auto i) {
return thrust::min(preceding_begin_raw[i], i + 1);
});
auto following_begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0),
[following_begin_raw, size = input.size()] __device__(auto i) {
return thrust::min(following_begin_raw[i], size - i - 1);
});

// Materialize collect list's offsets.
auto offsets = create_collect_offsets(
input.size(), preceding_begin, following_begin, min_periods, stream, mr);

// Map each element of the collect() result's child column
// to the index where it appears in the input.
auto per_row_mapping = get_list_child_to_list_row_mapping(offsets->view(), stream, mr);

// Generate gather map to produce the collect() result's child column.
auto gather_map = create_collect_gather_map(
offsets->view(), per_row_mapping->view(), preceding_begin, stream, mr);

// gather(), to construct child column.
auto gather_output =
cudf::gather(table_view{std::vector<column_view>{input}}, gather_map->view());

rmm::device_buffer null_mask;
size_type null_count;
std::tie(null_mask, null_count) = valid_if(
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(input.size()),
[preceding_begin, following_begin, min_periods] __device__(auto i) {
return (preceding_begin[i] + following_begin[i]) >= min_periods;
},
stream,
mr);

return make_lists_column(input.size(),
std::move(offsets),
std::move(gather_output->release()[0]),
null_count,
std::move(null_mask),
stream,
mr);
}
};

struct dispatch_rolling {
Expand Down
Loading