Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement COLLECT rolling window aggregation #7189

Merged
merged 27 commits into from
Jan 30, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4229fd6
JUNCO: Working prototype:
mythrocks Jan 17, 2021
ca0f651
WIP: Got offsets.
mythrocks Jan 17, 2021
5b639c5
WIP: Got child/input mapping.
mythrocks Jan 19, 2021
3170c18
WIP: Got gather map working.
mythrocks Jan 19, 2021
f0a208e
Working.
mythrocks Jan 19, 2021
d8b834b
WIP: Added fixup for rolling_window() iterators.
mythrocks Jan 20, 2021
2568341
WIP: Added support for empty lists in result.
mythrocks Jan 20, 2021
db73208
WIP: Switch for_each_n() to transform()
mythrocks Jan 20, 2021
871a4ae
WIP: Support for min_periods checks.
mythrocks Jan 21, 2021
3fbaf3e
WIP: Clarify how empty lists are handled...
mythrocks Jan 21, 2021
219cb5d
WIP: Tests!
mythrocks Jan 21, 2021
507239c
WIP: Moved get_num_child_rows() to utilities
mythrocks Jan 22, 2021
095e911
Merge remote-tracking branch 'origin/branch-0.18' into collect_list
mythrocks Jan 22, 2021
aec5ae1
Code formatting.
mythrocks Jan 22, 2021
3aa029b
Moved collect_list_test to .cpp
mythrocks Jan 22, 2021
cf281f0
Const all the things. Added some test descriptions.
mythrocks Jan 23, 2021
06764ad
Added tests for collecting decimal columns
mythrocks Jan 26, 2021
f9e4188
Removed namespace directives, for review.
mythrocks Jan 26, 2021
f897c62
Streamlined null mask construction:
mythrocks Jan 26, 2021
3e0c392
Better documentation for COLLECT helper functions.
mythrocks Jan 26, 2021
04417ed
Update dates in Copyright
mythrocks Jan 26, 2021
93be98c
Using input column sizes instead of whole column
mythrocks Jan 26, 2021
76926d4
Fixed copyrights. Refactored null mask construction.
mythrocks Jan 26, 2021
4bcd852
Test for Input columns with nulls.
mythrocks Jan 27, 2021
6558ac9
Merge remote-tracking branch 'origin/branch-0.18' into collect_list
mythrocks Jan 28, 2021
95a3d49
More tests for nulled inputs.
mythrocks Jan 28, 2021
7115460
Merge remote-tracking branch 'origin/branch-0.18' into collect_list
mythrocks Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 1 addition & 22 deletions cpp/include/cudf/lists/detail/scatter.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/lists/detail/utilities.cuh>
#include <cudf/lists/list_device_view.cuh>
#include <cudf/null_mask.hpp>
#include <cudf/strings/detail/utilities.cuh>
Expand Down Expand Up @@ -160,28 +161,6 @@ rmm::device_uvector<unbound_list_view> list_vector_from_column(
return vector;
}

/**
* @brief Fetch the number of rows in a lists column's child given its offsets column.
*
* @param list_offsets Offsets child of a lists column
* @param stream The cuda-stream to synchronize on, when reading from device memory
* @return cudf::size_type The last element in the list_offsets column, indicating
* the number of rows in the lists-column's child.
*/
cudf::size_type get_num_child_rows(cudf::column_view const& list_offsets,
rmm::cuda_stream_view stream)
{
// Number of rows in child-column == last offset value.
cudf::size_type num_child_rows{};
CUDA_TRY(cudaMemcpyAsync(&num_child_rows,
list_offsets.data<cudf::size_type>() + list_offsets.size() - 1,
sizeof(cudf::size_type),
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
return num_child_rows;
}

/**
* @brief Constructs null mask for a scattered list's child column
*
Expand Down
46 changes: 46 additions & 0 deletions cpp/include/cudf/lists/detail/utilities.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cudf/column/column_view.hpp>
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace detail {

/**
* @brief Fetch the number of rows in a lists column's child given its offsets column.
*
* @param[in] list_offsets Offsets child of a lists column
* @param[in] stream The cuda-stream to synchronize on, when reading from device memory
* @return cudf::size_type The number of child rows in the lists column
*/
static cudf::size_type get_num_child_rows(cudf::column_view const& list_offsets,
rmm::cuda_stream_view stream)
{
// Number of rows in child-column == last offset value.
cudf::size_type num_child_rows{};
CUDA_TRY(cudaMemcpyAsync(&num_child_rows,
list_offsets.data<cudf::size_type>() + list_offsets.size() - 1,
sizeof(cudf::size_type),
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
return num_child_rows;
}

} // namespace detail
} // namespace cudf
237 changes: 235 additions & 2 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/lists/detail/utilities.cuh>
#include <cudf/rolling.hpp>
#include <cudf/strings/detail/utilities.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/error.hpp>
Expand Down Expand Up @@ -310,7 +313,7 @@ template <typename InputType,
std::enable_if_t<!std::is_same<InputType, cudf::string_view>::value and
!(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL ||
op == aggregation::ROW_NUMBER || op == aggregation::LEAD ||
op == aggregation::LAG)>* = nullptr>
op == aggregation::LAG || op == aggregation::COLLECT)>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
Expand Down Expand Up @@ -814,7 +817,8 @@ struct rolling_window_launcher {
template <aggregation::Kind op,
typename PrecedingWindowIterator,
typename FollowingWindowIterator>
std::enable_if_t<!(op == aggregation::MEAN || op == aggregation::LEAD || op == aggregation::LAG),
std::enable_if_t<!(op == aggregation::MEAN || op == aggregation::LEAD || op == aggregation::LAG ||
op == aggregation::COLLECT),
std::unique_ptr<column>>
operator()(column_view const& input,
column_view const& default_outputs,
Expand Down Expand Up @@ -895,6 +899,235 @@ struct rolling_window_launcher {
stream,
mr);
}

template <typename PrecedingIter, typename FollowingIter>
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<column> get_collect_list_offsets(column_view const& input,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
PrecedingIter preceding_begin,
FollowingIter following_begin,
size_type min_periods,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using namespace cudf;
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

// Materialize offsets column.
auto size_data_type = data_type{type_to_id<size_type>()};
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
auto sizes =
make_fixed_width_column(size_data_type, input.size(), mask_state::UNALLOCATED, stream, mr);
auto mutable_sizes = sizes->mutable_view();
thrust::transform(thrust::device,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
preceding_begin,
preceding_begin + input.size(),
following_begin,
mutable_sizes.begin<size_type>(),
[min_periods] __device__(auto preceding, auto following) {
return (preceding + following) < min_periods ? 0 : (preceding + following);
});
return strings::detail::make_offsets_child_column(
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
sizes->view().begin<size_type>(), sizes->view().end<size_type>(), stream, mr);
}

template <typename PrecedingIter, typename FollowingIter>
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
std::pair<rmm::device_buffer, size_type> get_collect_list_null_mask(
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
column_view const& input,
PrecedingIter preceding_iter,
FollowingIter following_iter,
size_type min_periods,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
rmm::device_buffer null_mask;
size_type null_count;
std::tie(null_mask, null_count) = valid_if(
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(input.size()),
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
[preceding_iter, following_iter, min_periods] __device__(auto i) {
return (preceding_iter[i] + following_iter[i]) >= min_periods;
},
stream,
mr);
return (null_count == 0) ? std::make_pair(rmm::device_buffer{0, stream, mr}, size_type{0})
: std::make_pair(null_mask, null_count);
}

/**
* @brief Generate collect() list child's mapping to input column.
*
* If
* input col == [A,B,C,D,E]
* and preceding == [1,2,2,2,2],
* and following == [1,1,1,1,0],
* then,
* collect result == [ [A,B], [A,B,C], [B,C,D], [C,D,E], [D,E] ]
* i.e. result offset column == [0,2,5,8,11,13],
* and result child column == [A,B,A,B,C,B,C,D,C,D,E,D,E].
* Mapping back to `input` == [0,1,0,1,2,1,2,3,2,3,4,3,4]
*/
std::unique_ptr<column> get_list_child_to_list_row_mapping(cudf::column_view const& offsets,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using namespace cudf;

auto size_data_type = data_type{type_to_id<size_type>()};
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

// First, reduce offsets column by key, to identify the number of times
// an offset appears.
// Next, scatter the count for each offset (except the first and last),
// into a column of N `0`s, where N == number of child rows.
// For the example above:
// offsets == [0, 2, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
//
// If the above example had an empty list row at index 2,
// the same columns would look as follows:
// offsets == [0, 2, 5, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
//
// Note: To correctly handle null list rows at the beginning of
// the output column, care must be taken to skip the first `0`
// in the offsets column, when running `reduce_by_key()`.
// This accounts for the `0` added by default to the offsets
// column, marking the beginning of the column.

auto num_child_rows = get_num_child_rows(offsets, stream);

auto scatter_values =
make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr);
auto scatter_keys =
make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr);
auto reduced_by_key =
thrust::reduce_by_key(thrust::device,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
offsets.template begin<size_type>() + 1, // Skip first 0 in offsets.
offsets.template end<size_type>(),
thrust::make_constant_iterator<size_type>(1),
scatter_keys->mutable_view().template begin<size_type>(),
scatter_values->mutable_view().template begin<size_type>());
auto scatter_values_end = reduced_by_key.second;
auto scatter_output =
make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill_n(thrust::device,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
scatter_output->mutable_view().template begin<size_type>(),
num_child_rows,
0); // [0,0,0,...0]
thrust::scatter(
thrust::device,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
scatter_values->mutable_view().template begin<size_type>(),
scatter_values_end,
scatter_keys->view().template begin<size_type>(),
scatter_output->mutable_view().template begin<size_type>()); // [0,0,1,0,0,1,...]

// Next, generate mapping with inclusive_scan() on scatter() result.
// For the example above:
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4]
//
// For the case with an empty list at index 3:
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 3, 3, 3, 4, 4, 4, 5, 5]
auto per_row_mapping =
make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr);
thrust::inclusive_scan(thrust::device,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
scatter_output->view().template begin<size_type>(),
scatter_output->view().template end<size_type>(),
per_row_mapping->mutable_view().template begin<size_type>());
return per_row_mapping;
}

template <typename PrecedingIter>
std::unique_ptr<column> get_gather_map_for_child_column(column_view const& child_offsets,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
column_view const& per_row_mapping,
PrecedingIter preceding_iter,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto gather_map = make_fixed_width_column(data_type{type_to_id<size_type>()},
per_row_mapping.size(),
mask_state::UNALLOCATED,
stream,
mr);
thrust::transform(
thrust::device,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(per_row_mapping.size()),
gather_map->mutable_view().template begin<size_type>(),
[d_offsets =
child_offsets.template begin<size_type>(), // E.g. [0, 2, 5, 8, 11, 13]
d_groups =
per_row_mapping.template begin<size_type>(), // E.g. [0,0, 1,1,1, 2,2,2, 3,3,3, 4,4]
d_prev = preceding_iter] __device__(auto i) {
auto group = d_groups[i];
auto group_start_offset = d_offsets[group];
auto relative_index = i - group_start_offset;

return (group - d_prev[group] + 1) + relative_index;
});
return gather_map;
}

template <aggregation::Kind op, typename PrecedingIter, typename FollowingIter>
std::enable_if_t<(op == aggregation::COLLECT), std::unique_ptr<column>> operator()(
column_view const& input,
column_view const& default_outputs,
PrecedingIter preceding_begin_raw,
FollowingIter following_begin_raw,
size_type min_periods,
std::unique_ptr<aggregation> const& agg,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(default_outputs.is_empty(),
"COLLECT window function does not support default values.");

using namespace cudf;
using namespace cudf::detail;
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

if (input.is_empty()) return empty_like(input);

// Fix up preceding/following iterators to respect column boundaries,
// similar to gpu_rolling().
// `rolling_window()` does not fix up preceding/following so as not to read past
// column boundaries.
// `grouped_rolling_window()` and `time_range_based_grouped_rolling_window() do.
auto preceding_begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0), [preceding_begin_raw] __device__(auto i) {
return thrust::min(preceding_begin_raw[i], i + 1);
});
auto following_begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0),
[following_begin_raw, size = input.size()] __device__(auto i) {
return thrust::min(following_begin_raw[i], size - i - 1);
});

// Materialize collect list's offsets.
auto offsets =
get_collect_list_offsets(input, preceding_begin, following_begin, min_periods, stream, mr);

// Map each element of the collect() result's child column
// to the index where it appears in the input.
auto per_row_mapping = get_list_child_to_list_row_mapping(offsets->view(), stream, mr);

// Generate gather map to produce the collect() result's child column.
auto gather_map = get_gather_map_for_child_column(
offsets->view(), per_row_mapping->view(), preceding_begin, stream, mr);

// gather(), to construct child column.
auto gather_output =
cudf::gather(table_view{std::vector<column_view>{input}}, gather_map->view());

rmm::device_buffer null_mask;
size_type null_count;
std::tie(null_mask, null_count) =
get_collect_list_null_mask(input, preceding_begin, following_begin, min_periods, stream, mr);

return make_lists_column(input.size(),
std::move(offsets),
std::move(gather_output->release()[0]),
null_count,
std::move(null_mask),
stream,
mr);
}
};

struct dispatch_rolling {
Expand Down
18 changes: 12 additions & 6 deletions cpp/src/rolling/rolling_detail.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static constexpr bool is_rolling_supported()
(op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::MEAN) or (op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or
(op == aggregation::LAG);
(op == aggregation::LAG) or (op == aggregation::COLLECT);
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

constexpr bool is_valid_numeric_agg =
(cudf::is_numeric<ColumnType>() or cudf::is_duration<ColumnType>() or
Expand All @@ -53,21 +53,27 @@ static constexpr bool is_rolling_supported()
} else if (cudf::is_timestamp<ColumnType>()) {
return (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or (op == aggregation::LAG);
(op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or
(op == aggregation::LAG) or (op == aggregation::COLLECT);
} else if (cudf::is_fixed_point<ColumnType>()) {
return (op == aggregation::SUM) or (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or (op == aggregation::LAG);
(op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or
(op == aggregation::LAG) or (op == aggregation::COLLECT);
} else if (std::is_same<ColumnType, cudf::string_view>()) {
return (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::ROW_NUMBER);
(op == aggregation::ROW_NUMBER) or (op == aggregation::COLLECT);

} else if (std::is_same<ColumnType, cudf::list_view>()) {
return (op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::ROW_NUMBER);
} else
(op == aggregation::ROW_NUMBER) or (op == aggregation::COLLECT);
} else if (std::is_same<ColumnType, cudf::struct_view>()) {
// TODO: Add support for COUNT_VALID, COUNT_ALL, ROW_NUMBER.
return op == aggregation::COLLECT;
} else {
return false;
}
}

// return true if this Op is specialized for strings.
Expand Down
Loading