Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose streams in all public copying APIs #13629

Merged
merged 10 commits into from
Jul 13, 2023
6 changes: 5 additions & 1 deletion cpp/benchmarks/lists/copying/scatter_lists.cu
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ void BM_lists_scatter(::benchmark::State& state)

for (auto _ : state) {
cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0
scatter(cudf::table_view{{*source}}, *scatter_map, cudf::table_view{{*target}}, mr);
scatter(cudf::table_view{{*source}},
*scatter_map,
cudf::table_view{{*target}},
cudf::get_default_stream(),
mr);
}

state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) * state.range(0) * 2 *
Expand Down
88 changes: 74 additions & 14 deletions cpp/include/cudf/copying.hpp

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/concatenate.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/concatenate.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/dictionary/encode.hpp>
Expand All @@ -37,6 +37,7 @@
#include <cudf_test/default_stream.hpp>

#include <rmm/device_buffer.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <thrust/copy.h>
#include <thrust/functional.h>
Expand Down Expand Up @@ -1595,7 +1596,12 @@ class lists_column_wrapper : public detail::column_wrapper {
thrust::copy_if(
std::cbegin(cols), std::cend(cols), valids, std::back_inserter(children), thrust::identity{});

auto data = children.empty() ? cudf::empty_like(expected_hierarchy) : concatenate(children);
// TODO: Once the public concatenate API exposes streams, use that instead.
vyasr marked this conversation as resolved.
Show resolved Hide resolved
auto data =
children.empty()
? cudf::empty_like(expected_hierarchy)
: cudf::detail::concatenate(
children, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource());
bdice marked this conversation as resolved.
Show resolved Hide resolved

// increment depth
depth = expected_depth + 1;
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/copying/copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,21 @@ std::unique_ptr<table> empty_like(table_view const& input_table)

std::unique_ptr<column> allocate_like(column_view const& input,
mask_allocation_policy mask_alloc,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::allocate_like(input, input.size(), mask_alloc, cudf::get_default_stream(), mr);
return detail::allocate_like(input, input.size(), mask_alloc, stream, mr);
}

std::unique_ptr<column> allocate_like(column_view const& input,
size_type size,
mask_allocation_policy mask_alloc,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::allocate_like(input, size, mask_alloc, cudf::get_default_stream(), mr);
return detail::allocate_like(input, size, mask_alloc, stream, mr);
}

} // namespace cudf
12 changes: 8 additions & 4 deletions cpp/src/copying/copy.cu
Original file line number Diff line number Diff line change
Expand Up @@ -411,37 +411,41 @@ std::unique_ptr<column> copy_if_else(scalar const& lhs,
std::unique_ptr<column> copy_if_else(column_view const& lhs,
column_view const& rhs,
column_view const& boolean_mask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::copy_if_else(lhs, rhs, boolean_mask, cudf::get_default_stream(), mr);
return detail::copy_if_else(lhs, rhs, boolean_mask, stream, mr);
}

std::unique_ptr<column> copy_if_else(scalar const& lhs,
column_view const& rhs,
column_view const& boolean_mask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::copy_if_else(lhs, rhs, boolean_mask, cudf::get_default_stream(), mr);
return detail::copy_if_else(lhs, rhs, boolean_mask, stream, mr);
}

std::unique_ptr<column> copy_if_else(column_view const& lhs,
scalar const& rhs,
column_view const& boolean_mask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::copy_if_else(lhs, rhs, boolean_mask, cudf::get_default_stream(), mr);
return detail::copy_if_else(lhs, rhs, boolean_mask, stream, mr);
}

std::unique_ptr<column> copy_if_else(scalar const& lhs,
scalar const& rhs,
column_view const& boolean_mask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::copy_if_else(lhs, rhs, boolean_mask, cudf::get_default_stream(), mr);
return detail::copy_if_else(lhs, rhs, boolean_mask, stream, mr);
}

} // namespace cudf
9 changes: 5 additions & 4 deletions cpp/src/copying/copy_range.cu
Original file line number Diff line number Diff line change
Expand Up @@ -272,23 +272,24 @@ void copy_range_in_place(column_view const& source,
mutable_column_view& target,
size_type source_begin,
size_type source_end,
size_type target_begin)
size_type target_begin,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
return detail::copy_range_in_place(
source, target, source_begin, source_end, target_begin, cudf::get_default_stream());
source, target, source_begin, source_end, target_begin, stream);
}

std::unique_ptr<column> copy_range(column_view const& source,
column_view const& target,
size_type source_begin,
size_type source_end,
size_type target_begin,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::copy_range(
source, target, source_begin, source_end, target_begin, cudf::get_default_stream(), mr);
return detail::copy_range(source, target, source_begin, source_end, target_begin, stream, mr);
}

} // namespace cudf
4 changes: 2 additions & 2 deletions cpp/src/copying/gather.cu
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ std::unique_ptr<table> gather(table_view const& source_table,
std::unique_ptr<table> gather(table_view const& source_table,
column_view const& gather_map,
out_of_bounds_policy bounds_policy,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

auto index_policy = is_unsigned(gather_map.type()) ? detail::negative_index_policy::NOT_ALLOWED
: detail::negative_index_policy::ALLOWED;

return detail::gather(
source_table, gather_map, bounds_policy, index_policy, cudf::get_default_stream(), mr);
return detail::gather(source_table, gather_map, bounds_policy, index_policy, stream, mr);
}

} // namespace cudf
3 changes: 2 additions & 1 deletion cpp/src/copying/get_element.cu
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ std::unique_ptr<scalar> get_element(column_view const& input,

std::unique_ptr<scalar> get_element(column_view const& input,
size_type index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::get_element(input, index, cudf::get_default_stream(), mr);
return detail::get_element(input, index, stream, mr);
}

} // namespace cudf
7 changes: 4 additions & 3 deletions cpp/src/copying/purge_nonempty_nulls.cu
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,19 @@ bool may_have_nonempty_nulls(column_view const& input)
/**
* @copydoc cudf::has_nonempty_nulls
*/
bool has_nonempty_nulls(column_view const& input)
bool has_nonempty_nulls(column_view const& input, rmm::cuda_stream_view stream)
{
return detail::has_nonempty_nulls(input, cudf::get_default_stream());
return detail::has_nonempty_nulls(input, stream);
}

/**
* @copydoc cudf::purge_nonempty_nulls(column_view const&, rmm::mr::device_memory_resource*)
*/
std::unique_ptr<cudf::column> purge_nonempty_nulls(column_view const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return detail::purge_nonempty_nulls(input, cudf::get_default_stream(), mr);
return detail::purge_nonempty_nulls(input, stream, mr);
}

} // namespace cudf
14 changes: 9 additions & 5 deletions cpp/src/copying/reverse.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,20 +50,24 @@ std::unique_ptr<column> reverse(column_view const& source_column,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return std::move(cudf::reverse(table_view({source_column}))->release().front());
return std::move(
cudf::detail::reverse(table_view({source_column}), stream, mr)->release().front());
}
} // namespace detail

std::unique_ptr<table> reverse(table_view const& source_table, rmm::mr::device_memory_resource* mr)
std::unique_ptr<table> reverse(table_view const& source_table,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::reverse(source_table, cudf::get_default_stream(), mr);
return detail::reverse(source_table, stream, mr);
}

std::unique_ptr<column> reverse(column_view const& source_column,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::reverse(source_column, cudf::get_default_stream(), mr);
return detail::reverse(source_column, stream, mr);
}
} // namespace cudf
5 changes: 3 additions & 2 deletions cpp/src/copying/sample.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,9 +90,10 @@ std::unique_ptr<table> sample(table_view const& input,
size_type const n,
sample_with_replacement replacement,
int64_t const seed,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::sample(input, n, replacement, seed, cudf::get_default_stream(), mr);
return detail::sample(input, n, replacement, seed, stream, mr);
}
} // namespace cudf
18 changes: 11 additions & 7 deletions cpp/src/copying/scatter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ struct column_scalar_scatterer_impl<struct_view, MapIterator> {

auto scatter_functor = column_scalar_scatterer<decltype(scatter_iter)>{};
auto fields_iter_begin = make_counting_transform_iterator(0, [&](auto const& i) {
auto row_slr =
get_element(typed_s->view().column(i), 0, stream, rmm::mr::get_current_device_resource());
auto row_slr = detail::get_element(
typed_s->view().column(i), 0, stream, rmm::mr::get_current_device_resource());
return type_dispatcher<dispatch_storage_type>(row_slr->type(),
scatter_functor,
*row_slr,
Expand Down Expand Up @@ -328,7 +328,7 @@ std::unique_ptr<table> scatter(table_view const& source,
scatter_map.data(),
nullptr,
0);
return scatter(source, map_col, target, stream, mr);
return detail::scatter(source, map_col, target, stream, mr);
}

std::unique_ptr<table> scatter(std::vector<std::reference_wrapper<scalar const>> const& source,
Expand Down Expand Up @@ -495,38 +495,42 @@ std::unique_ptr<table> boolean_mask_scatter(
std::unique_ptr<table> scatter(table_view const& source,
column_view const& scatter_map,
table_view const& target,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::scatter(source, scatter_map, target, cudf::get_default_stream(), mr);
return detail::scatter(source, scatter_map, target, stream, mr);
}

std::unique_ptr<table> scatter(std::vector<std::reference_wrapper<scalar const>> const& source,
column_view const& indices,
table_view const& target,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::scatter(source, indices, target, cudf::get_default_stream(), mr);
return detail::scatter(source, indices, target, stream, mr);
}

std::unique_ptr<table> boolean_mask_scatter(table_view const& input,
table_view const& target,
column_view const& boolean_mask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::boolean_mask_scatter(input, target, boolean_mask, cudf::get_default_stream(), mr);
return detail::boolean_mask_scatter(input, target, boolean_mask, stream, mr);
}

std::unique_ptr<table> boolean_mask_scatter(
std::vector<std::reference_wrapper<scalar const>> const& input,
table_view const& target,
column_view const& boolean_mask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::boolean_mask_scatter(input, target, boolean_mask, cudf::get_default_stream(), mr);
return detail::boolean_mask_scatter(input, target, boolean_mask, stream, mr);
}

} // namespace cudf
3 changes: 2 additions & 1 deletion cpp/src/copying/shift.cu
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ std::unique_ptr<column> shift(column_view const& input,
std::unique_ptr<column> shift(column_view const& input,
size_type offset,
scalar const& fill_value,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::shift(input, offset, fill_value, cudf::get_default_stream(), mr);
return detail::shift(input, offset, fill_value, stream, mr);
}

} // namespace cudf
28 changes: 18 additions & 10 deletions cpp/src/copying/slice.cu
Original file line number Diff line number Diff line change
Expand Up @@ -136,40 +136,48 @@ std::vector<column_view> slice(column_view const& input,
std::initializer_list<size_type> indices,
rmm::cuda_stream_view stream)
{
return slice(input, host_span<size_type const>(indices.begin(), indices.size()), stream);
return detail::slice(input, host_span<size_type const>(indices.begin(), indices.size()), stream);
}

std::vector<table_view> slice(table_view const& input,
std::initializer_list<size_type> indices,
rmm::cuda_stream_view stream)
{
return slice(input, host_span<size_type const>(indices.begin(), indices.size()), stream);
return detail::slice(input, host_span<size_type const>(indices.begin(), indices.size()), stream);
};

} // namespace detail

std::vector<column_view> slice(column_view const& input, host_span<size_type const> indices)
std::vector<column_view> slice(column_view const& input,
host_span<size_type const> indices,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
return detail::slice(input, indices, cudf::get_default_stream());
return detail::slice(input, indices, stream);
}

std::vector<table_view> slice(table_view const& input, host_span<size_type const> indices)
std::vector<table_view> slice(table_view const& input,
host_span<size_type const> indices,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
return detail::slice(input, indices, cudf::get_default_stream());
return detail::slice(input, indices, stream);
};

std::vector<column_view> slice(column_view const& input, std::initializer_list<size_type> indices)
std::vector<column_view> slice(column_view const& input,
std::initializer_list<size_type> indices,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
return detail::slice(input, indices, cudf::get_default_stream());
return detail::slice(input, indices, stream);
}

std::vector<table_view> slice(table_view const& input, std::initializer_list<size_type> indices)
std::vector<table_view> slice(table_view const& input,
std::initializer_list<size_type> indices,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
return detail::slice(input, indices, cudf::get_default_stream());
return detail::slice(input, indices, stream);
};

} // namespace cudf
Loading