Skip to content

Commit

Permalink
Fix memory write error in get_list_child_to_list_row_mapping utility (#…
Browse files Browse the repository at this point in the history
…8994)

Reference issue #8883 and depends on fixes in PR #8884

The `get_list_child_to_list_row_mapping` builds a map for rolling operation on a lists column. In the `thrust::scatter` call a map value includes the last offset which will always be out-of-bounds to given output vector. This output vector is used to build the resultant output map by calling `thrust::inclusive_scan` but the out-of-bounds offset value is not used -- which is why the utility does not fail. The fix in this PR simply allocates an extra row in the intermediate vector so the `thrust::scatter` will not write to out-of-bounds memory. Since the value is eventually ignored, it does not effect the result.

The code in this function was creating many temporary columns incorrectly using the passed in `device_resource_manager` variable `mr`. The code was corrected by changing these to be just `device_uvector's` instead making it more clear that these are internal temporary memory buffers. Further the code calling `get_list_child_to_list_row_mapping` utility is using the output as a temporary column and so this PR fixes the logic to correct the memory resource usage.

Authors:
  - David Wendt (https://github.com/davidwendt)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - MithunR (https://github.com/mythrocks)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #8994
  • Loading branch information
davidwendt authored Aug 23, 2021
1 parent e42464c commit d4c3f32
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 147 deletions.
3 changes: 2 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,9 @@ add_library(cudf
src/reshape/interleave_columns.cu
src/reshape/tile.cu
src/rolling/grouped_rolling.cu
src/rolling/rolling.cu
src/rolling/range_window_bounds.cpp
src/rolling/rolling.cu
src/rolling/rolling_collect_list.cu
src/round/round.cu
src/scalar/scalar.cpp
src/scalar/scalar_factories.cpp
Expand Down
157 changes: 157 additions & 0 deletions cpp/src/rolling/rolling_collect_list.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <rolling/rolling_collect_list.cuh>

#include <cudf/detail/get_value.cuh>
#include <cudf/detail/iterator.cuh>

#include <rmm/device_uvector.hpp>

#include <thrust/copy.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/scan.h>
#include <thrust/scatter.h>

namespace cudf {
namespace detail {

/**
* @see cudf::detail::get_list_child_to_list_row_mapping
*/
std::unique_ptr<column> get_list_child_to_list_row_mapping(cudf::column_view const& offsets,
rmm::cuda_stream_view stream)
{
// First, scatter the count for each repeated offset (except the first and last),
// into a column of N `0`s, where N == number of child rows.
// For example:
// offsets == [0, 2, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
//
// An example with empty list row at index 2:
// offsets == [0, 2, 5, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
//
auto const num_child_rows{
cudf::detail::get_value<size_type>(offsets, offsets.size() - 1, stream)};
auto per_row_mapping = make_fixed_width_column(
data_type{type_to_id<size_type>()}, num_child_rows, mask_state::UNALLOCATED, stream);
auto per_row_mapping_begin = per_row_mapping->mutable_view().template begin<size_type>();
thrust::fill_n(rmm::exec_policy(stream), per_row_mapping_begin, num_child_rows, 0);

auto const begin = thrust::make_counting_iterator<size_type>(0);
thrust::scatter_if(rmm::exec_policy(stream),
begin,
begin + offsets.size() - 1,
offsets.begin<size_type>(),
begin, // stencil iterator
per_row_mapping_begin,
[offset = offsets.begin<size_type>()] __device__(auto i) {
return offset[i] != offset[i + 1];
}); // [0,0,1,0,0,3,...]

// Next, generate mapping with inclusive_scan(max) on the scatter result.
// For the example above:
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 3, 0, 0, 4, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4]
//
// For the case with an empty list at index 2:
// scatter result == [0, 0, 1, 0, 0, 3, 0, 0, 4, 0, 0, 5, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 3, 3, 3, 4, 4, 4, 5, 5]
thrust::inclusive_scan(rmm::exec_policy(stream),
per_row_mapping_begin,
per_row_mapping_begin + num_child_rows,
per_row_mapping_begin,
thrust::maximum<size_type>{});
return per_row_mapping;
}

/**
* @see cudf::detail::count_child_nulls
*/
size_type count_child_nulls(column_view const& input,
std::unique_ptr<column> const& gather_map,
rmm::cuda_stream_view stream)
{
auto input_device_view = column_device_view::create(input, stream);

auto input_row_is_null = [d_input = *input_device_view] __device__(auto i) {
return d_input.is_null_nocheck(i);
};

return thrust::count_if(rmm::exec_policy(stream),
gather_map->view().begin<size_type>(),
gather_map->view().end<size_type>(),
input_row_is_null);
}

/**
* @see cudf::detail::rolling_collect_list
*/
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> purge_null_entries(
column_view const& input,
column_view const& gather_map,
column_view const& offsets,
size_type num_child_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto input_device_view = column_device_view::create(input, stream);

auto input_row_not_null = [d_input = *input_device_view] __device__(auto i) {
return d_input.is_valid_nocheck(i);
};

// Purge entries in gather_map that correspond to null input.
auto new_gather_map = make_fixed_width_column(data_type{type_to_id<size_type>()},
gather_map.size() - num_child_nulls,
mask_state::UNALLOCATED,
stream);
thrust::copy_if(rmm::exec_policy(stream),
gather_map.template begin<size_type>(),
gather_map.template end<size_type>(),
new_gather_map->mutable_view().template begin<size_type>(),
input_row_not_null);

// Recalculate offsets after null entries are purged.
auto new_sizes = make_fixed_width_column(
data_type{type_to_id<size_type>()}, input.size(), mask_state::UNALLOCATED, stream);

thrust::transform(rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(input.size()),
new_sizes->mutable_view().template begin<size_type>(),
[d_gather_map = gather_map.template begin<size_type>(),
d_old_offsets = offsets.template begin<size_type>(),
input_row_not_null] __device__(auto i) {
return thrust::count_if(thrust::seq,
d_gather_map + d_old_offsets[i],
d_gather_map + d_old_offsets[i + 1],
input_row_not_null);
});

auto new_offsets =
strings::detail::make_offsets_child_column(new_sizes->view().template begin<size_type>(),
new_sizes->view().template end<size_type>(),
stream,
mr);

return std::make_pair<std::unique_ptr<column>, std::unique_ptr<column>>(std::move(new_gather_map),
std::move(new_offsets));
}

} // namespace detail
} // namespace cudf
163 changes: 17 additions & 146 deletions cpp/src/rolling/rolling_collect_list.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,20 @@

#pragma once

#include <cudf/aggregation.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/get_value.cuh>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/strings/detail/utilities.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/reduce.h>
#include <thrust/transform.h>

namespace cudf {
namespace detail {

namespace {
/**
* @brief Creates the offsets child of the result of the `COLLECT_LIST` window aggregation
*
Expand Down Expand Up @@ -97,73 +93,7 @@ std::unique_ptr<column> create_collect_offsets(size_type input_size,
* Mapping back to `input` == [0,1,0,1,2,1,2,3,2,3,4,3,4]
*/
std::unique_ptr<column> get_list_child_to_list_row_mapping(cudf::column_view const& offsets,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto static constexpr size_data_type = data_type{type_to_id<size_type>()};

// First, reduce offsets column by key, to identify the number of times
// an offset appears.
// Next, scatter the count for each offset (except the first and last),
// into a column of N `0`s, where N == number of child rows.
// For the example above:
// offsets == [0, 2, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
//
// If the above example had an empty list row at index 2,
// the same columns would look as follows:
// offsets == [0, 2, 5, 5, 8, 11, 13]
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
//
// Note: To correctly handle null list rows at the beginning of
// the output column, care must be taken to skip the first `0`
// in the offsets column, when running `reduce_by_key()`.
// This accounts for the `0` added by default to the offsets
// column, marking the beginning of the column.

auto const num_child_rows{
cudf::detail::get_value<size_type>(offsets, offsets.size() - 1, stream)};

auto scatter_values =
make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr);
auto scatter_keys =
make_fixed_width_column(size_data_type, offsets.size(), mask_state::UNALLOCATED, stream, mr);
auto reduced_by_key =
thrust::reduce_by_key(rmm::exec_policy(stream),
offsets.template begin<size_type>() + 1, // Skip first 0 in offsets.
offsets.template end<size_type>(),
thrust::make_constant_iterator<size_type>(1),
scatter_keys->mutable_view().template begin<size_type>(),
scatter_values->mutable_view().template begin<size_type>());
auto scatter_values_end = reduced_by_key.second;
auto scatter_output =
make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill_n(rmm::exec_policy(stream),
scatter_output->mutable_view().template begin<size_type>(),
num_child_rows,
0); // [0,0,0,...0]
thrust::scatter(rmm::exec_policy(stream),
scatter_values->mutable_view().template begin<size_type>(),
scatter_values_end,
scatter_keys->view().template begin<size_type>(),
scatter_output->mutable_view().template begin<size_type>()); // [0,0,1,0,0,1,...]

// Next, generate mapping with inclusive_scan() on scatter() result.
// For the example above:
// scatter result == [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4]
//
// For the case with an empty list at index 3:
// scatter result == [0, 0, 1, 0, 0, 2, 0, 0, 1, 0, 0, 1, 0]
// inclusive_scan == [0, 0, 1, 1, 1, 3, 3, 3, 4, 4, 4, 5, 5]
auto per_row_mapping =
make_fixed_width_column(size_data_type, num_child_rows, mask_state::UNALLOCATED, stream, mr);
thrust::inclusive_scan(rmm::exec_policy(stream),
scatter_output->view().template begin<size_type>(),
scatter_output->view().template end<size_type>(),
per_row_mapping->mutable_view().template begin<size_type>());
return per_row_mapping;
}
rmm::cuda_stream_view stream);

/**
* @brief Create gather map to generate the child column of the result of
Expand All @@ -173,14 +103,10 @@ template <typename PrecedingIter>
std::unique_ptr<column> create_collect_gather_map(column_view const& child_offsets,
column_view const& per_row_mapping,
PrecedingIter preceding_iter,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
rmm::cuda_stream_view stream)
{
auto gather_map = make_fixed_width_column(data_type{type_to_id<size_type>()},
per_row_mapping.size(),
mask_state::UNALLOCATED,
stream,
mr);
auto gather_map = make_fixed_width_column(
data_type{type_to_id<size_type>()}, per_row_mapping.size(), mask_state::UNALLOCATED, stream);
thrust::transform(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
Expand All @@ -205,19 +131,7 @@ std::unique_ptr<column> create_collect_gather_map(column_view const& child_offse
*/
size_type count_child_nulls(column_view const& input,
std::unique_ptr<column> const& gather_map,
rmm::cuda_stream_view stream)
{
auto input_device_view = column_device_view::create(input, stream);

auto input_row_is_null = [d_input = *input_device_view] __device__(auto i) {
return d_input.is_null_nocheck(i);
};

return thrust::count_if(rmm::exec_policy(stream),
gather_map->view().template begin<size_type>(),
gather_map->view().template end<size_type>(),
input_row_is_null);
}
rmm::cuda_stream_view stream);

/**
* @brief Purge entries for null inputs from gather_map, and adjust offsets.
Expand All @@ -228,54 +142,7 @@ std::pair<std::unique_ptr<column>, std::unique_ptr<column>> purge_null_entries(
column_view const& offsets,
size_type num_child_nulls,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto input_device_view = column_device_view::create(input, stream);

auto input_row_not_null = [d_input = *input_device_view] __device__(auto i) {
return d_input.is_valid_nocheck(i);
};

// Purge entries in gather_map that correspond to null input.
auto new_gather_map = make_fixed_width_column(data_type{type_to_id<size_type>()},
gather_map.size() - num_child_nulls,
mask_state::UNALLOCATED,
stream,
mr);
thrust::copy_if(rmm::exec_policy(stream),
gather_map.template begin<size_type>(),
gather_map.template end<size_type>(),
new_gather_map->mutable_view().template begin<size_type>(),
input_row_not_null);

// Recalculate offsets after null entries are purged.
auto new_sizes = make_fixed_width_column(
data_type{type_to_id<size_type>()}, input.size(), mask_state::UNALLOCATED, stream, mr);

thrust::transform(rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(input.size()),
new_sizes->mutable_view().template begin<size_type>(),
[d_gather_map = gather_map.template begin<size_type>(),
d_old_offsets = offsets.template begin<size_type>(),
input_row_not_null] __device__(auto i) {
return thrust::count_if(thrust::seq,
d_gather_map + d_old_offsets[i],
d_gather_map + d_old_offsets[i + 1],
input_row_not_null);
});

auto new_offsets =
strings::detail::make_offsets_child_column(new_sizes->view().template begin<size_type>(),
new_sizes->view().template end<size_type>(),
stream,
mr);

return std::make_pair<std::unique_ptr<column>, std::unique_ptr<column>>(std::move(new_gather_map),
std::move(new_offsets));
}

} // anonymous namespace
rmm::mr::device_memory_resource* mr);

template <typename PrecedingIter, typename FollowingIter>
std::unique_ptr<column> rolling_collect_list(column_view const& input,
Expand Down Expand Up @@ -313,11 +180,11 @@ std::unique_ptr<column> rolling_collect_list(column_view const& input,

// Map each element of the collect() result's child column
// to the index where it appears in the input.
auto per_row_mapping = get_list_child_to_list_row_mapping(offsets->view(), stream, mr);
auto per_row_mapping = get_list_child_to_list_row_mapping(offsets->view(), stream);

// Generate gather map to produce the collect() result's child column.
auto gather_map = create_collect_gather_map(
offsets->view(), per_row_mapping->view(), preceding_begin, stream, mr);
auto gather_map =
create_collect_gather_map(offsets->view(), per_row_mapping->view(), preceding_begin, stream);

// If gather_map collects null elements, and null_policy == EXCLUDE,
// those elements must be filtered out, and offsets recomputed.
Expand All @@ -330,8 +197,12 @@ std::unique_ptr<column> rolling_collect_list(column_view const& input,
}

// gather(), to construct child column.
auto gather_output =
cudf::gather(table_view{std::vector<column_view>{input}}, gather_map->view());
auto gather_output = cudf::detail::gather(table_view{std::vector<column_view>{input}},
gather_map->view(),
cudf::out_of_bounds_policy::DONT_CHECK,
cudf::detail::negative_index_policy::NOT_ALLOWED,
stream,
mr);

rmm::device_buffer null_mask;
size_type null_count;
Expand Down

0 comments on commit d4c3f32

Please sign in to comment.