Skip to content

Commit

Permalink
Support Scatter struct_scalar (#8630)
Browse files Browse the repository at this point in the history
Partially addresses #8558

This PR adds support for scattering struct scalars.

Implementation notes:
Current implementation is based on copying the row data from each field as a new scalar and recursively calls scalar scattering on each field. There maybe an optimization on eliminating such copy. But will require extra scaffolding/scatter machinery.

Minor aspects of this PR:
- Refactors `column_scalar_scatterer` to include `scatter_scalar_bitmask_inplace` in each level of dispatch. This is required because scalar scattering can be nested.
- Adds `count_set_bit` and `count_unset_bit` detail APIs
- Adds default stream/mr for `detail::get_element`

Authors:
  - Michael Wang (https://github.com/isVoid)

Approvers:
  - Conor Hoekstra (https://github.com/codereport)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Gera Shegalov (https://github.com/gerashegalov)

URL: #8630
  • Loading branch information
isVoid authored Jul 9, 2021
1 parent 214d74a commit 8bd0dfe
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 42 deletions.
9 changes: 5 additions & 4 deletions cpp/include/cudf/detail/copy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,10 @@ std::unique_ptr<table> sample(
*
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr<scalar> get_element(column_view const& input,
size_type index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
std::unique_ptr<scalar> get_element(
column_view const& input,
size_type index,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
} // namespace detail
} // namespace cudf
20 changes: 20 additions & 0 deletions cpp/include/cudf/detail/null_mask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ void set_null_mask(bitmask_type* bitmask,
bool valid,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @copydoc cudf::count_set_bits
*
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
cudf::size_type count_set_bits(bitmask_type const* bitmask,
size_type start,
size_type stop,
rmm::cuda_stream_view stream);

/**
* @copydoc cudf::count_unset_bits
*
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
cudf::size_type count_unset_bits(bitmask_type const* bitmask,
size_type start,
size_type stop,
rmm::cuda_stream_view stream);

/**
* @copydoc cudf::segmented_count_set_bits
*
Expand Down
127 changes: 89 additions & 38 deletions cpp/src/copying/scatter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cudf/detail/gather.cuh>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/indexalator.cuh>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/scatter.cuh>
Expand All @@ -27,6 +28,7 @@
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/dictionary/detail/search.hpp>
#include <cudf/lists/list_view.cuh>
#include <cudf/scalar/scalar_factories.hpp>
#include <cudf/strings/detail/scatter.cuh>
#include <cudf/strings/string_view.cuh>
#include <cudf/structs/struct_view.hpp>
Expand Down Expand Up @@ -63,32 +65,32 @@ __global__ void marking_bitmask_kernel(mutable_column_device_view destination,
}

template <typename MapIterator>
void scatter_scalar_bitmask(std::vector<std::reference_wrapper<const scalar>> const& source,
MapIterator scatter_map,
size_type num_scatter_rows,
std::vector<std::unique_ptr<column>>& target,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
void scatter_scalar_bitmask_inplace(std::reference_wrapper<const scalar> const& source,
MapIterator scatter_map,
size_type num_scatter_rows,
column& target,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
constexpr size_type block_size = 256;
size_type const grid_size = grid_1d(num_scatter_rows, block_size).num_blocks;

for (size_t i = 0; i < target.size(); ++i) {
auto const source_is_valid = source[i].get().is_valid(stream);
if (target[i]->nullable() or not source_is_valid) {
if (not target[i]->nullable()) {
// Target must have a null mask if the source is not valid
auto mask = detail::create_null_mask(target[i]->size(), mask_state::ALL_VALID, stream, mr);
target[i]->set_null_mask(std::move(mask), 0);
}

auto target_view = mutable_column_device_view::create(target[i]->mutable_view(), stream);

auto bitmask_kernel = source_is_valid ? marking_bitmask_kernel<true, decltype(scatter_map)>
: marking_bitmask_kernel<false, decltype(scatter_map)>;
bitmask_kernel<<<grid_size, block_size, 0, stream.value()>>>(
*target_view, scatter_map, num_scatter_rows);
auto const source_is_valid = source.get().is_valid(stream);
if (target.nullable() or not source_is_valid) {
if (not target.nullable()) {
// Target must have a null mask if the source is not valid
auto mask = detail::create_null_mask(target.size(), mask_state::ALL_VALID, stream, mr);
target.set_null_mask(std::move(mask), 0);
}

auto target_view = mutable_column_device_view::create(target, stream);

auto bitmask_kernel = source_is_valid ? marking_bitmask_kernel<true, decltype(scatter_map)>
: marking_bitmask_kernel<false, decltype(scatter_map)>;
bitmask_kernel<<<grid_size, block_size, 0, stream.value()>>>(
*target_view, scatter_map, num_scatter_rows);

target.set_null_count(count_unset_bits(target.view().null_mask(), 0, target.size(), stream));
}
}

Expand All @@ -103,6 +105,7 @@ struct column_scalar_scatterer_impl {
{
CUDF_EXPECTS(source.get().type() == target.type(), "scalar and column types must match");

// make a copy of data and null mask from source
auto result = std::make_unique<column>(target, stream, mr);
auto result_view = result->mutable_view();

Expand All @@ -117,6 +120,7 @@ struct column_scalar_scatterer_impl {
scatter_iter,
result_view.begin<Element>());

scatter_scalar_bitmask_inplace(source, scatter_iter, scatter_rows, *result, stream, mr);
return result;
}
};
Expand All @@ -136,7 +140,10 @@ struct column_scalar_scatterer_impl<string_view, MapIterator> {
auto const source_view = string_view(scalar_impl->data(), scalar_impl->size());
auto const begin = thrust::make_constant_iterator(source_view);
auto const end = begin + scatter_rows;
return strings::detail::scatter(begin, end, scatter_iter, target, stream, mr);
auto result = strings::detail::scatter(begin, end, scatter_iter, target, stream, mr);

scatter_scalar_bitmask_inplace(source, scatter_iter, scatter_rows, *result, stream, mr);
return result;
}
};

Expand All @@ -149,17 +156,11 @@ struct column_scalar_scatterer_impl<list_view, MapIterator> {
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const
{
return lists::detail::scatter(
source, scatter_iter, scatter_iter + scatter_rows, target, stream, mr);
}
};
auto result =
lists::detail::scatter(source, scatter_iter, scatter_iter + scatter_rows, target, stream, mr);

template <typename MapIterator>
struct column_scalar_scatterer_impl<struct_view, MapIterator> {
template <typename... Args>
std::unique_ptr<column> operator()(Args&&...) const
{
CUDF_FAIL("scatter scalar to struct_view not implemented");
scatter_scalar_bitmask_inplace(source, scatter_iter, scatter_rows, *result, stream, mr);
return result;
}
};

Expand Down Expand Up @@ -200,10 +201,13 @@ struct column_scalar_scatterer_impl<dictionary32, MapIterator> {
// use the keys from the matched column
std::unique_ptr<column> keys_column(std::move(dict_target->release().children.back()));
// create the output column
return make_dictionary_column(std::move(keys_column),
std::move(indices_column),
std::move(*(contents.null_mask.release())),
null_count);
auto result = make_dictionary_column(std::move(keys_column),
std::move(indices_column),
std::move(*(contents.null_mask.release())),
null_count);

scatter_scalar_bitmask_inplace(source, scatter_iter, scatter_rows, *result, stream, mr);
return result;
}
};

Expand All @@ -222,6 +226,55 @@ struct column_scalar_scatterer {
}
};

template <typename MapIterator>
struct column_scalar_scatterer_impl<struct_view, MapIterator> {
std::unique_ptr<column> operator()(std::reference_wrapper<const scalar> const& source,
MapIterator scatter_iter,
size_type scatter_rows,
column_view const& target,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const
{
// For each field of `source`, copy construct a scalar from the field
// and dispatch to the corresponding scalar scatterer

auto typed_s = static_cast<struct_scalar const*>(&source.get());
size_type const n_fields = typed_s->view().num_columns();
CUDF_EXPECTS(n_fields == target.num_children(), "Mismatched number of fields.");

auto scatter_functor = column_scalar_scatterer<decltype(scatter_iter)>{};
auto fields_iter_begin = make_counting_transform_iterator(0, [&](auto const& i) {
auto row_slr = get_element(typed_s->view().column(i), 0, stream);
return type_dispatcher<dispatch_storage_type>(row_slr->type(),
scatter_functor,
*row_slr,
scatter_iter,
scatter_rows,
target.child(i),
stream,
mr);
});
std::vector<std::unique_ptr<column>> fields(fields_iter_begin, fields_iter_begin + n_fields);

// Compute null mask
rmm::device_buffer null_mask =
target.nullable() ? copy_bitmask(target, stream, mr)
: create_null_mask(target.size(), mask_state::UNALLOCATED, stream, mr);
column null_mask_stub(data_type{type_id::STRUCT},
target.size(),
rmm::device_buffer{},
std::move(null_mask),
target.null_count());
scatter_scalar_bitmask_inplace(source, scatter_iter, scatter_rows, null_mask_stub, stream, mr);
size_type null_count = null_mask_stub.null_count();
auto contents = null_mask_stub.release();

// Null mask pushdown inside factory method
return make_structs_column(
target.size(), std::move(fields), null_count, std::move(*contents.null_mask));
}
};

} // namespace

std::unique_ptr<table> scatter(table_view const& source,
Expand Down Expand Up @@ -305,8 +358,6 @@ std::unique_ptr<table> scatter(std::vector<std::reference_wrapper<const scalar>>
mr);
});

scatter_scalar_bitmask(source, scatter_iter, scatter_rows, result, stream, mr);

return std::make_unique<table>(std::move(result));
}

Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ ConfigureTest(COPYING_TEST
copying/scatter_list_tests.cpp
copying/scatter_list_scalar_tests.cpp
copying/scatter_struct_tests.cpp
copying/scatter_struct_scalar_tests.cpp
copying/segmented_gather_list_tests.cpp
copying/shift_tests.cpp
copying/slice_tests.cpp
Expand Down
Loading

0 comments on commit 8bd0dfe

Please sign in to comment.