Skip to content

Commit

Permalink
Replace raw streams with rmm::cuda_stream_view (part 2) (#6648)
Browse files Browse the repository at this point in the history
Converting libcudf to use rmm::cuda_stream_view will require a LOT of changes, so I'm splitting it into multiple PRs to ease reviewing. This is the second PR in the series. This series of PRs will

    Replace usage of cudaStream_t with rmm::cuda_stream_view
    Replace usage of 0 or nullptr as a stream identifier with rmm::cuda_stream_default
    Ensure all APIs always order the stream parameter before the memory resource parameter. #5119

Contributes to #6645 and #5119

Depends on #6646 so this PR will look much bigger until that one is merged.

Also fixes #6706 (to_arrow and to_dlpack are not properly synchronized).

This second PR converts:

    table.hpp (and source / dependencies)
    column_factories.hpp (and source / dependencies)
    headers in include/detail/aggregation (and source / dependencies)
    include/detail/groupby/sort_helper.hpp (and source / dependencies)
    include/detail/utilities/cuda.cuh (and dependencies)
    binary ops
    concatenate
    copy_if
    copy_range
    fill
    gather
    get_value
    hash groupby
    quantiles
    reductions
    repeat
    replace
    reshape
    round
    scatter
    search
    sequence
    sorting
    stream compaction
  • Loading branch information
harrism authored Nov 20, 2020
1 parent 263ec65 commit 7e51022
Show file tree
Hide file tree
Showing 163 changed files with 2,845 additions and 2,552 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
- PR #6610 Add ability to set scalar values in `cudf.DataFrame`
- PR #6612 Update JNI to new RMM cuda_stream_view API
- PR #6646 Replace `cudaStream_t` with `rmm::cuda_stream_view` (part 1)
- PR #6648 Replace `cudaStream_t` with `rmm::cuda_stream_view` (part 2)
- PR #6579 Update scatter APIs to use reference wrapper / const scalar
- PR #6614 Add support for conversion to Pandas nullable dtypes and fix related issue in `cudf.to_json`
- PR #6622 Update `to_pandas` api docs
Expand Down
1 change: 1 addition & 0 deletions conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ test:
- test -f $PREFIX/include/cudf/detail/replace.hpp
- test -f $PREFIX/include/cudf/detail/reshape.hpp
- test -f $PREFIX/include/cudf/detail/round.hpp
- test -f $PREFIX/include/cudf/detail/quantiles.hpp
- test -f $PREFIX/include/cudf/detail/scatter.hpp
- test -f $PREFIX/include/cudf/detail/search.hpp
- test -f $PREFIX/include/cudf/detail/sequence.hpp
Expand Down
152 changes: 77 additions & 75 deletions cpp/include/cudf/column/column_factories.hpp

Large diffs are not rendered by default.

15 changes: 10 additions & 5 deletions cpp/include/cudf/detail/aggregation/aggregation.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <cudf/detail/utilities/release_assert.cuh>
#include <cudf/table/table_device_view.cuh>

#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace detail {
/**
Expand Down Expand Up @@ -409,15 +411,17 @@ struct identity_initializer {
public:
template <typename T, aggregation::Kind k>
std::enable_if_t<is_supported<T, k>(), void> operator()(mutable_column_view const& col,
cudaStream_t stream = 0)
rmm::cuda_stream_view stream)
{
thrust::fill(
rmm::exec_policy(stream)->on(stream), col.begin<T>(), col.end<T>(), get_identity<T, k>());
thrust::fill(rmm::exec_policy(stream)->on(stream.value()),
col.begin<T>(),
col.end<T>(),
get_identity<T, k>());
}

template <typename T, aggregation::Kind k>
std::enable_if_t<not is_supported<T, k>(), void> operator()(mutable_column_view const& col,
cudaStream_t stream = 0)
rmm::cuda_stream_view stream)
{
CUDF_FAIL("Unsupported aggregation for initializing values");
}
Expand All @@ -436,10 +440,11 @@ struct identity_initializer {
* @param table The table of columns to initialize.
* @param aggs A vector of aggregation operations corresponding to the table
* columns. The aggregations determine the identity value for each column.
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
void initialize_with_identity(mutable_table_view& table,
std::vector<aggregation::Kind> const& aggs,
cudaStream_t stream = 0);
rmm::cuda_stream_view stream);

} // namespace detail
} // namespace cudf
21 changes: 11 additions & 10 deletions cpp/include/cudf/detail/binaryop.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2019, NVIDIA CORPORATION.
* Copyright (c) 2018-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,11 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/binaryop.hpp>

#include <rmm/cuda_stream_view.hpp>

namespace cudf {
//! Inner interfaces and implementations
namespace detail {
Expand All @@ -32,8 +33,8 @@ std::unique_ptr<column> binary_operation(
column_view const& rhs,
binary_operator op,
data_type output_type,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0);
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::binary_operation(column_view const&, scalar const&, binary_operator,
Expand All @@ -46,8 +47,8 @@ std::unique_ptr<column> binary_operation(
scalar const& rhs,
binary_operator op,
data_type output_type,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0);
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::binary_operation(column_view const&, column_view const&,
Expand All @@ -60,8 +61,8 @@ std::unique_ptr<column> binary_operation(
column_view const& rhs,
binary_operator op,
data_type output_type,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0);
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::binary_operation(column_view const&, column_view const&,
Expand All @@ -74,8 +75,8 @@ std::unique_ptr<column> binary_operation(
column_view const& rhs,
std::string const& ptx,
data_type output_type,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0);
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace detail
} // namespace cudf
6 changes: 4 additions & 2 deletions cpp/include/cudf/detail/concatenate.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <cudf/detail/concatenate.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <vector>

namespace cudf {
Expand All @@ -36,7 +38,7 @@ void concatenate_masks(rmm::device_vector<column_device_view> const& d_views,
rmm::device_vector<size_t> const& d_offsets,
bitmask_type* dest_mask,
size_type output_size,
cudaStream_t stream);
rmm::cuda_stream_view stream);

/**
* @copydoc cudf::concatenate_masks(std::vector<column_view> const&,bitmask_type*)
Expand All @@ -45,7 +47,7 @@ void concatenate_masks(rmm::device_vector<column_device_view> const& d_views,
*/
void concatenate_masks(std::vector<column_view> const& views,
bitmask_type* dest_mask,
cudaStream_t stream);
rmm::cuda_stream_view stream);

} // namespace detail
} // namespace cudf
10 changes: 6 additions & 4 deletions cpp/include/cudf/detail/concatenate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <cudf/concatenate.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <vector>

namespace cudf {
Expand All @@ -31,8 +33,8 @@ namespace detail {
*/
std::unique_ptr<column> concatenate(
std::vector<column_view> const& columns_to_concat,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0);
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::concatenate(std::vector<table_view> const&,rmm::mr::device_memory_resource*)
Expand All @@ -41,8 +43,8 @@ std::unique_ptr<column> concatenate(
*/
std::unique_ptr<table> concatenate(
std::vector<table_view> const& tables_to_concat,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0);
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace detail
} // namespace cudf
53 changes: 27 additions & 26 deletions cpp/include/cudf/detail/copy_if.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <cudf/utilities/traits.hpp>
#include <cudf/utilities/type_dispatcher.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/device_uvector.hpp>
Expand Down Expand Up @@ -210,8 +211,8 @@ struct scatter_gather_functor {
cudf::size_type const* block_offsets,
Filter filter,
cudf::size_type per_thread,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0)
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
{
auto output_column = cudf::detail::allocate_like(
input, output_size, cudf::mask_allocation_policy::RETAIN, stream, mr);
Expand All @@ -231,18 +232,18 @@ struct scatter_gather_functor {
CUDA_TRY(cudaMemsetAsync(static_cast<void*>(output.null_mask()),
0,
cudf::bitmask_allocation_size_bytes(output.size()),
stream));
stream.value()));
}

auto output_device_view = cudf::mutable_column_device_view::create(output, stream);
auto input_device_view = cudf::column_device_view::create(input, stream);
scatter<<<grid.num_blocks, block_size, 0, stream>>>(*output_device_view,
null_count.data(),
*input_device_view,
block_offsets,
input.size(),
per_thread,
filter);
scatter<<<grid.num_blocks, block_size, 0, stream.value()>>>(*output_device_view,
null_count.data(),
*input_device_view,
block_offsets,
input.size(),
per_thread,
filter);

if (has_valid) { output_column->set_null_count(null_count.value(stream)); }
return output_column;
Expand All @@ -256,19 +257,19 @@ struct scatter_gather_functor {
cudf::size_type const* block_offsets,
Filter filter,
cudf::size_type per_thread,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0)
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
{
rmm::device_uvector<cudf::size_type> indices(output_size, stream);

thrust::copy_if(rmm::exec_policy(stream)->on(stream),
thrust::copy_if(rmm::exec_policy(stream)->on(stream.value()),
thrust::counting_iterator<cudf::size_type>(0),
thrust::counting_iterator<cudf::size_type>(input.size()),
indices.begin(),
filter);

auto output_table = cudf::detail::gather(
cudf::table_view{{input}}, indices.begin(), indices.end(), false, mr, stream);
cudf::table_view{{input}}, indices.begin(), indices.end(), false, stream, mr);

// There will be only one column
return std::make_unique<cudf::column>(std::move(output_table->get_column(0)));
Expand All @@ -281,8 +282,8 @@ struct scatter_gather_functor {
cudf::size_type const* block_offsets,
Filter filter,
cudf::size_type per_thread,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0)
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
{
CUDF_FAIL("fixed_point type not supported for this operation yet");
}
Expand All @@ -309,8 +310,8 @@ template <typename Filter>
std::unique_ptr<table> copy_if(
table_view const& input,
Filter filter,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0)
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
{
CUDF_FUNC_RANGE();

Expand All @@ -326,12 +327,12 @@ std::unique_ptr<table> copy_if(
rmm::device_uvector<cudf::size_type> block_offsets(grid.num_blocks + 1, stream);

// 1. Find the count of elements in each block that "pass" the mask
compute_block_counts<Filter, block_size><<<grid.num_blocks, block_size, 0, stream>>>(
compute_block_counts<Filter, block_size><<<grid.num_blocks, block_size, 0, stream.value()>>>(
block_counts.begin(), input.num_rows(), per_thread, filter);

// initialize just the first element of block_offsets to 0 since the InclusiveSum below
// starts at the second element.
CUDA_TRY(cudaMemsetAsync(block_offsets.begin(), 0, sizeof(cudf::size_type), stream));
CUDA_TRY(cudaMemsetAsync(block_offsets.begin(), 0, sizeof(cudf::size_type), stream.value()));

// 2. Find the offset for each block's output using a scan of block counts
if (grid.num_blocks > 1) {
Expand All @@ -342,7 +343,7 @@ std::unique_ptr<table> copy_if(
block_counts.begin(),
block_offsets.begin() + 1,
grid.num_blocks,
stream);
stream.value());
rmm::device_buffer d_temp_storage(temp_storage_bytes, stream);

// Run exclusive prefix sum
Expand All @@ -351,7 +352,7 @@ std::unique_ptr<table> copy_if(
block_counts.begin(),
block_offsets.begin() + 1,
grid.num_blocks,
stream);
stream.value());
}

// As it is InclusiveSum, last value in block_offsets will be output_size
Expand All @@ -362,9 +363,9 @@ std::unique_ptr<table> copy_if(
grid.num_blocks > 1 ? block_offsets.begin() + grid.num_blocks : block_counts.begin(),
sizeof(cudf::size_type),
cudaMemcpyDefault,
stream));
stream.value()));

CUDA_TRY(cudaStreamSynchronize(stream));
stream.synchronize();

if (output_size == input.num_rows()) {
return std::make_unique<table>(input, stream, mr);
Expand All @@ -378,8 +379,8 @@ std::unique_ptr<table> copy_if(
block_offsets.begin(),
filter,
per_thread,
mr,
stream);
stream,
mr);
});

return std::make_unique<table>(std::move(out_columns));
Expand Down
16 changes: 9 additions & 7 deletions cpp/include/cudf/detail/copy_range.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,8 @@
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/type_dispatcher.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>

#include <cub/cub.cuh>
Expand Down Expand Up @@ -134,7 +136,7 @@ void copy_range(SourceValueIterator source_value_begin,
mutable_column_view& target,
size_type target_begin,
size_type target_end,
cudaStream_t stream = 0)
rmm::cuda_stream_view stream = rmm::cuda_stream_default)
{
CUDF_EXPECTS((target_begin <= target_end) && (target_begin >= 0) &&
(target_begin < target.size()) && (target_end <= target.size()),
Expand Down Expand Up @@ -162,7 +164,7 @@ void copy_range(SourceValueIterator source_value_begin,

auto kernel =
copy_range_kernel<block_size, SourceValueIterator, SourceValidityIterator, T, true>;
kernel<<<grid.num_blocks, block_size, 0, stream>>>(
kernel<<<grid.num_blocks, block_size, 0, stream.value()>>>(
source_value_begin,
source_validity_begin,
*mutable_column_device_view::create(target, stream),
Expand All @@ -174,7 +176,7 @@ void copy_range(SourceValueIterator source_value_begin,
} else {
auto kernel =
copy_range_kernel<block_size, SourceValueIterator, SourceValidityIterator, T, false>;
kernel<<<grid.num_blocks, block_size, 0, stream>>>(
kernel<<<grid.num_blocks, block_size, 0, stream.value()>>>(
source_value_begin,
source_validity_begin,
*mutable_column_device_view::create(target, stream),
Expand All @@ -195,7 +197,7 @@ void copy_range_in_place(column_view const& source,
size_type source_begin,
size_type source_end,
size_type target_begin,
cudaStream_t stream = 0);
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @copydoc cudf::copy_range
Expand All @@ -208,8 +210,8 @@ std::unique_ptr<column> copy_range(
size_type source_begin,
size_type source_end,
size_type target_begin,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(),
cudaStream_t stream = 0);
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace detail
} // namespace cudf
Loading

0 comments on commit 7e51022

Please sign in to comment.