Skip to content

Commit

Permalink
Use stream pool for scatter.
Browse files Browse the repository at this point in the history
  • Loading branch information
bdice committed Sep 21, 2023
1 parent 7d5e527 commit 988f4d0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 36 deletions.
42 changes: 22 additions & 20 deletions cpp/include/cudf/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -657,31 +657,32 @@ std::unique_ptr<table> gather(table_view const& source_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
std::vector<std::unique_ptr<column>> destination_columns;
auto const num_columns = source_table.num_columns();
auto result = std::vector<std::unique_ptr<column>>(num_columns);

// The data gather for n columns will be executed over n streams. If there is
// only a single column, the fork/join overhead should be avoided.
auto const num_columns = source_table.num_columns();
auto streams = std::vector<rmm::cuda_stream_view>{};
auto streams = std::vector<rmm::cuda_stream_view>{};
if (num_columns > 1) {
streams = cudf::detail::fork_streams(stream, num_columns);
} else {
streams.push_back(stream);
}

for (auto i = 0; i < num_columns; ++i) {
CUDF_FUNC_RANGE();
auto it = thrust::make_counting_iterator<size_type>(0);

std::transform(it, it + num_columns, result.begin(), [&](size_type i) {
auto const& source_column = source_table.column(i);
destination_columns.push_back(
cudf::type_dispatcher<dispatch_storage_type>(source_column.type(),
column_gatherer{},
source_column,
gather_map_begin,
gather_map_end,
bounds_policy == out_of_bounds_policy::NULLIFY,
streams[i],
mr));
}
return cudf::type_dispatcher<dispatch_storage_type>(
source_column.type(),
column_gatherer{},
source_column,
gather_map_begin,
gather_map_end,
bounds_policy == out_of_bounds_policy::NULLIFY,
streams[i],
mr);
});

auto const nullable = bounds_policy == out_of_bounds_policy::NULLIFY ||
std::any_of(source_table.begin(), source_table.end(), [](auto const& col) {
Expand All @@ -690,14 +691,15 @@ std::unique_ptr<table> gather(table_view const& source_table,
if (nullable) {
auto const op = bounds_policy == out_of_bounds_policy::NULLIFY ? gather_bitmask_op::NULLIFY
: gather_bitmask_op::DONT_CHECK;
gather_bitmask(source_table, gather_map_begin, destination_columns, op, stream, mr);
gather_bitmask(source_table, gather_map_begin, result, op, stream, mr);
}

// Join streams as late as possible so that the gather_bitmask can run on its
// own stream while other streams are gathering. Skip joining if only one
// column, since it used the passed in stream rather than forking.
// Join streams as late as possible so that null mask computations can run on
// the passed in stream while other streams are gathering. Skip joining if
// only one column, since it used the passed in stream rather than forking.
if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }
return std::make_unique<table>(std::move(destination_columns));

return std::make_unique<table>(std::move(result));
}

} // namespace detail
Expand Down
49 changes: 33 additions & 16 deletions cpp/include/cudf/detail/scatter.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/detail/gather.cuh>
#include <cudf/detail/indexalator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/dictionary/detail/update_keys.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
Expand Down Expand Up @@ -405,22 +406,32 @@ std::unique_ptr<table> scatter(table_view const& source,
thrust::make_transform_iterator(scatter_map_begin, index_converter<MapType>{target.num_rows()});
auto updated_scatter_map_end =
thrust::make_transform_iterator(scatter_map_end, index_converter<MapType>{target.num_rows()});
auto result = std::vector<std::unique_ptr<column>>(target.num_columns());

std::transform(source.begin(),
source.end(),
target.begin(),
result.begin(),
[=](auto const& source_col, auto const& target_col) {
return type_dispatcher<dispatch_storage_type>(source_col.type(),
column_scatterer{},
source_col,
updated_scatter_map_begin,
updated_scatter_map_end,
target_col,
stream,
mr);
});

auto const num_columns = target.num_columns();
auto result = std::vector<std::unique_ptr<column>>(num_columns);

// The data scatter for n columns will be executed over n streams. If there is
// only a single column, the fork/join overhead should be avoided.
auto streams = std::vector<rmm::cuda_stream_view>{};
if (num_columns > 1) {
streams = cudf::detail::fork_streams(stream, num_columns);
} else {
streams.push_back(stream);
}

auto it = thrust::make_counting_iterator<size_type>(0);

std::transform(it, it + num_columns, result.begin(), [&](size_type i) {
auto const& source_col = source.column(i);
return type_dispatcher<dispatch_storage_type>(source_col.type(),
column_scatterer{},
source_col,
updated_scatter_map_begin,
updated_scatter_map_end,
target.column(i),
streams[i],
mr);
});

// We still need to call `gather_bitmask` even when the source columns are not nullable,
// as if the target has null_mask, that null_mask needs to be updated after scattering.
Expand Down Expand Up @@ -451,6 +462,12 @@ std::unique_ptr<table> scatter(table_view const& source,
}
});
}

// Join streams as late as possible so that null mask computations can run on
// the passed in stream while other streams are scattering. Skip joining if
// only one column, since it used the passed in stream rather than forking.
if (num_columns > 1) { cudf::detail::join_streams(streams, stream); }

return std::make_unique<table>(std::move(result));
}
} // namespace detail
Expand Down

0 comments on commit 988f4d0

Please sign in to comment.