Skip to content

Commit

Permalink
Use nosync policy in gather and scatter implementations. (#12038)
Browse files Browse the repository at this point in the history
This PR uses `rmm::exec_policy_nosync` in libcudf's gather and scatter functions. These changes are motivated by performance improvements seen previously in #11577.

# Checklist
- [x] I am familiar with the [Contributing Guidelines](https://github.com/rapidsai/cudf/blob/HEAD/CONTRIBUTING.md).
- [x] New or existing tests cover these changes.
- [x] The documentation is up to date with these changes.

Authors:
  - Bradley Dice (https://github.com/bdice)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

URL: #12038
  • Loading branch information
bdice authored Nov 7, 2022
1 parent ec46e7f commit 2ced214
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 26 deletions.
4 changes: 2 additions & 2 deletions cpp/include/cudf/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void gather_helper(InputItr source_itr,
{
using map_type = typename std::iterator_traits<MapIterator>::value_type;
if (nullify_out_of_bounds) {
thrust::gather_if(rmm::exec_policy(stream),
thrust::gather_if(rmm::exec_policy_nosync(stream),
gather_map_begin,
gather_map_end,
gather_map_begin,
Expand All @@ -137,7 +137,7 @@ void gather_helper(InputItr source_itr,
bounds_checker<map_type>{0, source_size});
} else {
thrust::gather(
rmm::exec_policy(stream), gather_map_begin, gather_map_end, source_itr, target_itr);
rmm::exec_policy_nosync(stream), gather_map_begin, gather_map_end, source_itr, target_itr);
}
}

Expand Down
12 changes: 6 additions & 6 deletions cpp/include/cudf/detail/scatter.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ auto scatter_to_gather(MapIterator scatter_map_begin,
// We'll use the `numeric_limits::lowest()` value for this since it should always be outside the
// valid range.
auto gather_map = rmm::device_uvector<size_type>(gather_rows, stream);
thrust::uninitialized_fill(rmm::exec_policy(stream),
thrust::uninitialized_fill(rmm::exec_policy_nosync(stream),
gather_map.begin(),
gather_map.end(),
std::numeric_limits<size_type>::lowest());

// Convert scatter map to a gather map
thrust::scatter(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator<MapValueType>(0),
thrust::make_counting_iterator<MapValueType>(std::distance(scatter_map_begin, scatter_map_end)),
scatter_map_begin,
Expand Down Expand Up @@ -114,13 +114,13 @@ auto scatter_to_gather_complement(MapIterator scatter_map_begin,
rmm::cuda_stream_view stream)
{
auto gather_map = rmm::device_uvector<size_type>(gather_rows, stream);
thrust::sequence(rmm::exec_policy(stream), gather_map.begin(), gather_map.end(), 0);
thrust::sequence(rmm::exec_policy_nosync(stream), gather_map.begin(), gather_map.end(), 0);

auto const out_of_bounds_begin =
thrust::make_constant_iterator(std::numeric_limits<size_type>::lowest());
auto const out_of_bounds_end =
out_of_bounds_begin + thrust::distance(scatter_map_begin, scatter_map_end);
thrust::scatter(rmm::exec_policy(stream),
thrust::scatter(rmm::exec_policy_nosync(stream),
out_of_bounds_begin,
out_of_bounds_end,
scatter_map_begin,
Expand Down Expand Up @@ -152,7 +152,7 @@ struct column_scatterer_impl<Element, std::enable_if_t<cudf::is_fixed_width<Elem

// NOTE use source.begin + scatter rows rather than source.end in case the
// scatter map is smaller than the number of source rows
thrust::scatter(rmm::exec_policy(stream),
thrust::scatter(rmm::exec_policy_nosync(stream),
source.begin<Element>(),
source.begin<Element>() + cudf::distance(scatter_map_begin, scatter_map_end),
scatter_map_begin,
Expand Down Expand Up @@ -226,7 +226,7 @@ struct column_scatterer_impl<dictionary32> {
auto source_itr = indexalator_factory::make_input_iterator(source_view.indices());
auto new_indices = std::make_unique<column>(target_view.get_indices_annotated(), stream, mr);
auto target_itr = indexalator_factory::make_output_iterator(new_indices->mutable_view());
thrust::scatter(rmm::exec_policy(stream),
thrust::scatter(rmm::exec_policy_nosync(stream),
source_itr,
source_itr + std::distance(scatter_map_begin, scatter_map_end),
scatter_map_begin,
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/lists/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ gather_data make_gather_data(cudf::lists_column_view const& source_column,
// generate the compacted outgoing offsets.
auto count_iter = thrust::make_counting_iterator<int32_t>(0);
thrust::transform_exclusive_scan(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
count_iter,
count_iter + offset_count,
dst_offsets_v.begin<int32_t>(),
Expand Down Expand Up @@ -125,7 +125,7 @@ gather_data make_gather_data(cudf::lists_column_view const& source_column,
// generate the base offsets
rmm::device_uvector<int32_t> base_offsets = rmm::device_uvector<int32_t>(output_count, stream);
thrust::transform(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
gather_map,
gather_map + output_count,
base_offsets.data(),
Expand Down
6 changes: 3 additions & 3 deletions cpp/include/cudf/lists/detail/scatter.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ rmm::device_uvector<unbound_list_view> list_vector_from_column(

auto vector = rmm::device_uvector<unbound_list_view>(n_rows, stream, mr);

thrust::transform(rmm::exec_policy(stream),
thrust::transform(rmm::exec_policy_nosync(stream),
index_begin,
index_end,
vector.begin(),
Expand Down Expand Up @@ -104,7 +104,7 @@ std::unique_ptr<column> scatter_impl(
auto const child_column_type = lists_column_view(target).child().type();

// Scatter.
thrust::scatter(rmm::exec_policy(stream),
thrust::scatter(rmm::exec_policy_nosync(stream),
source_vector.begin(),
source_vector.end(),
scatter_map_begin,
Expand Down Expand Up @@ -239,7 +239,7 @@ std::unique_ptr<column> scatter(
: cudf::detail::create_null_mask(1, mask_state::ALL_NULL, stream, mr);
auto offset_column = make_numeric_column(
data_type{type_to_id<offset_type>()}, 2, mask_state::UNALLOCATED, stream, mr);
thrust::sequence(rmm::exec_policy(stream),
thrust::sequence(rmm::exec_policy_nosync(stream),
offset_column->mutable_view().begin<offset_type>(),
offset_column->mutable_view().end<offset_type>(),
0,
Expand Down
10 changes: 6 additions & 4 deletions cpp/include/cudf/strings/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ std::unique_ptr<cudf::column> gather(strings_column_view const& strings,
auto const d_in_offsets = (strings_count > 0) ? strings.offsets_begin() : nullptr;
auto const d_strings = column_device_view::create(strings.parent(), stream);
thrust::transform(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
begin,
end,
d_out_offsets,
Expand All @@ -317,7 +317,7 @@ std::unique_ptr<cudf::column> gather(strings_column_view const& strings,
// check total size is not too large
size_t const total_bytes = thrust::transform_reduce(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
d_out_offsets,
d_out_offsets + output_count,
[] __device__(auto size) { return static_cast<size_t>(size); },
Expand All @@ -327,8 +327,10 @@ std::unique_ptr<cudf::column> gather(strings_column_view const& strings,
"total size of output strings is too large for a cudf column");
// In-place convert output sizes into offsets
thrust::exclusive_scan(
rmm::exec_policy(stream), d_out_offsets, d_out_offsets + output_count + 1, d_out_offsets);
thrust::exclusive_scan(rmm::exec_policy_nosync(stream),
d_out_offsets,
d_out_offsets + output_count + 1,
d_out_offsets);
// build chars column
cudf::device_span<int32_t const> const d_out_offsets_span(d_out_offsets, output_count + 1);
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/cudf/strings/detail/scatter.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ std::unique_ptr<column> scatter(SourceIterator begin,
begin, [] __device__(string_view const sv) { return sv.empty() ? string_view{} : sv; });

// do the scatter
thrust::scatter(rmm::exec_policy(stream), itr, itr + size, scatter_map, target_vector.begin());
thrust::scatter(
rmm::exec_policy_nosync(stream), itr, itr + size, scatter_map, target_vector.begin());

// build the output column
auto sv_span = cudf::device_span<string_view const>(target_vector);
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/copying/scatter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ struct column_scalar_scatterer_impl {
auto scalar_iter =
thrust::make_permutation_iterator(scalar_impl->data(), thrust::make_constant_iterator(0));

thrust::scatter(rmm::exec_policy(stream),
thrust::scatter(rmm::exec_policy_nosync(stream),
scalar_iter,
scalar_iter + scatter_rows,
scatter_iter,
Expand Down Expand Up @@ -191,8 +191,11 @@ struct column_scalar_scatterer_impl<dictionary32, MapIterator> {
auto new_indices = std::make_unique<column>(dict_view.get_indices_annotated(), stream, mr);
auto target_iter = indexalator_factory::make_output_iterator(new_indices->mutable_view());

thrust::scatter(
rmm::exec_policy(stream), scalar_iter, scalar_iter + scatter_rows, scatter_iter, target_iter);
thrust::scatter(rmm::exec_policy_nosync(stream),
scalar_iter,
scalar_iter + scatter_rows,
scatter_iter,
target_iter);

// build the dictionary indices column from the result
auto const indices_type = new_indices->type();
Expand Down Expand Up @@ -383,7 +386,7 @@ std::unique_ptr<column> boolean_mask_scatter(column_view const& input,
data_type{type_id::INT32}, target.size(), mask_state::UNALLOCATED, stream);
auto mutable_indices = indices->mutable_view();

thrust::sequence(rmm::exec_policy(stream),
thrust::sequence(rmm::exec_policy_nosync(stream),
mutable_indices.begin<size_type>(),
mutable_indices.end<size_type>(),
0);
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/lists/copying/scatter_helper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ struct list_child_constructor {
mr);

thrust::transform(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(child_column->size()),
child_column->mutable_view().begin<T>(),
Expand Down Expand Up @@ -237,7 +237,7 @@ struct list_child_constructor {
auto const null_string_view = string_view{nullptr, 0}; // placeholder for factory function

thrust::transform(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(string_views.size()),
string_views.begin(),
Expand Down Expand Up @@ -304,7 +304,7 @@ struct list_child_constructor {
// For instance, if a parent list_device_view has 3 elements, it should have 3 corresponding
// child list_device_view instances.
thrust::transform(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(child_list_views.size()),
child_list_views.begin(),
Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/copying/detail_gather_tests.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TYPED_TEST(GatherTest, GatherDetailDeviceVectorTest)
constexpr cudf::size_type source_size{1000};
rmm::device_uvector<cudf::size_type> gather_map(source_size, cudf::get_default_stream());
thrust::sequence(
rmm::exec_policy(cudf::get_default_stream()), gather_map.begin(), gather_map.end());
rmm::exec_policy_nosync(cudf::get_default_stream()), gather_map.begin(), gather_map.end());

auto data = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i; });
cudf::test::fixed_width_column_wrapper<TypeParam> source_column(data, data + source_size);
Expand Down

0 comments on commit 2ced214

Please sign in to comment.