Skip to content

Commit

Permalink
Move rank scan implementations from scan_inclusive.cu to rank_scan.cu (
Browse files Browse the repository at this point in the history
…#9351)

This change was mainly to improve the compile time for `reductions/scan/scan_inclusive.cu` by refactoring out the rank-scan functions into a separate file `rank.cu`. Although the overall compile time improvement for `scan_inclusive.cu` is only 25%, the source code is better organized with this change. The code function has changed. 

The detail `inclusive_rank_scan` and `inclusive_dense_rank_scan` declarations were moved from `src/reductions/scan/scan.cuh` to `include/cudf/detail/scan.hpp` and dispatching of the RANK and DENSE_RANK aggregation is done in `scan.cpp` instead of handled by `scan_inclusive.cu` and also `scan_exclusive.cu` (which just throws an exception anyway).

Authors:
  - David Wendt (https://github.com/davidwendt)

Approvers:
  - Jake Hemstad (https://github.com/jrhemstad)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #9351
  • Loading branch information
davidwendt authored Oct 5, 2021
1 parent fb18491 commit 122da20
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 119 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ add_library(cudf
src/reductions/nth_element.cu
src/reductions/product.cu
src/reductions/reductions.cpp
src/reductions/scan/rank_scan.cu
src/reductions/scan/scan.cpp
src/reductions/scan/scan_exclusive.cu
src/reductions/scan/scan_inclusive.cu
Expand Down
24 changes: 24 additions & 0 deletions cpp/include/cudf/detail/scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,29 @@ std::unique_ptr<column> scan_inclusive(column_view const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Generate row ranks for a column
*
* @param order_by Input column to generate ranks for
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return rank values
*/
std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Generate row dense ranks for a column
*
* @param order_by Input column to generate ranks for
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return rank values
*/
std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

} // namespace detail
} // namespace cudf
130 changes: 130 additions & 0 deletions cpp/src/reductions/scan/rank_scan.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <structs/utilities.hpp>

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/table/row_operators.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/scan.h>
#include <thrust/tabulate.h>

namespace cudf {
namespace detail {
namespace {

/**
* @brief generate row ranks or dense ranks using a row comparison then scan the results
*
* @tparam has_nulls if the order_by column has nulls
* @tparam value_resolver flag value resolver with boolean first and row number arguments
* @tparam scan_operator scan function ran on the flag values
* @param order_by input column to generate ranks for
* @param resolver flag value resolver
* @param scan_op scan operation ran on the flag results
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return std::unique_ptr<column> rank values
*/
template <bool has_nulls, typename value_resolver, typename scan_operator>
std::unique_ptr<column> rank_generator(column_view const& order_by,
value_resolver resolver,
scan_operator scan_op,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
auto const flattener = cudf::structs::detail::flatten_nested_columns(
order_table, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(std::get<0>(flattener), stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(data_type{type_to_id<size_type>()},
order_table.num_rows(),
mask_state::UNALLOCATED,
stream,
mr);
auto mutable_ranks = ranks->mutable_view();

thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator, resolver] __device__(size_type row_index) {
return resolver(row_index == 0 || !comparator(row_index, row_index - 1),
row_index);
});

thrust::inclusive_scan(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
mutable_ranks.begin<size_type>(),
scan_op);
return ranks;
}

} // namespace

std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in dense_rank scan.");
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}
return rank_generator<false>(
order_by,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}

std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in rank scan.");
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}
return rank_generator<false>(
order_by,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}

} // namespace detail
} // namespace cudf
12 changes: 12 additions & 0 deletions cpp/src/reductions/scan/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ std::unique_ptr<column> scan(column_view const& input,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

if (agg->kind == aggregation::RANK) {
CUDF_EXPECTS(inclusive == scan_type::INCLUSIVE,
"Unsupported rank aggregation operator for exclusive scan");
return inclusive_rank_scan(input, rmm::cuda_stream_default, mr);
}
if (agg->kind == aggregation::DENSE_RANK) {
CUDF_EXPECTS(inclusive == scan_type::INCLUSIVE,
"Unsupported dense rank aggregation operator for exclusive scan");
return inclusive_dense_rank_scan(input, rmm::cuda_stream_default, mr);
}

return inclusive == scan_type::EXCLUSIVE
? detail::scan_exclusive(input, agg, null_handling, rmm::cuda_stream_default, mr)
: detail::scan_inclusive(input, agg, null_handling, rmm::cuda_stream_default, mr);
Expand Down
18 changes: 3 additions & 15 deletions cpp/src/reductions/scan/scan.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,16 @@ rmm::device_buffer mask_scan(column_view const& input_view,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

template <template <typename> typename DispatchFn>
std::unique_ptr<column> scan_agg_dispatch(const column_view& input,
std::unique_ptr<aggregation> const& agg,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (agg->kind != aggregation::RANK && agg->kind != aggregation::DENSE_RANK) {
CUDF_EXPECTS(
is_numeric(input.type()) || is_compound(input.type()) || is_fixed_point(input.type()),
"Unexpected non-numeric or non-string type.");
}
CUDF_EXPECTS(
is_numeric(input.type()) || is_compound(input.type()) || is_fixed_point(input.type()),
"Unexpected non-numeric or non-string type.");

switch (agg->kind) {
case aggregation::SUM:
Expand All @@ -70,8 +60,6 @@ std::unique_ptr<column> scan_agg_dispatch(const column_view& input,
if (is_fixed_point(input.type())) CUDF_FAIL("decimal32/64 cannot support product scan");
return type_dispatcher<dispatch_storage_type>(
input.type(), DispatchFn<DeviceProduct>(), input, null_handling, stream, mr);
case aggregation::RANK: return inclusive_rank_scan(input, stream, mr);
case aggregation::DENSE_RANK: return inclusive_dense_rank_scan(input, stream, mr);
default: CUDF_FAIL("Unsupported aggregation operator for scan");
}
}
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/reductions/scan/scan_exclusive.cu
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ std::unique_ptr<column> scan_exclusive(const column_view& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(agg->kind != aggregation::RANK && agg->kind != aggregation::DENSE_RANK,
"Unsupported rank aggregation operator for exclusive scan");
auto output = scan_agg_dispatch<scan_dispatcher>(input, agg, null_handling, stream, mr);

if (null_handling == null_policy::EXCLUDE) {
Expand Down
102 changes: 1 addition & 101 deletions cpp/src/reductions/scan/scan_inclusive.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@
#include "scan.cuh"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/reduction.hpp>
#include <cudf/strings/detail/gather.cuh>
#include <cudf/table/row_operators.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <structs/utilities.hpp>

#include <thrust/scan.h>

namespace cudf {
Expand Down Expand Up @@ -197,101 +192,8 @@ struct scan_dispatcher {
}
};

/**
* @brief generate row ranks or dense ranks using a row comparison then scan the results
*
* @tparam has_nulls if the order_by column has nulls
* @tparam value_resolver flag value resolver with boolean first and row number arguments
* @tparam scan_operator scan function ran on the flag values
* @param order_by input column to generate ranks for
* @param resolver flag value resolver
* @param scan_op scan operation ran on the flag results
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return std::unique_ptr<column> rank values
*/
template <bool has_nulls, typename value_resolver, typename scan_operator>
std::unique_ptr<column> rank_generator(column_view const& order_by,
value_resolver resolver,
scan_operator scan_op,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
auto const flattener = cudf::structs::detail::flatten_nested_columns(
order_table, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(std::get<0>(flattener), stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(data_type{type_to_id<size_type>()},
order_table.num_rows(),
mask_state::UNALLOCATED,
stream,
mr);
auto mutable_ranks = ranks->mutable_view();

thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator, resolver] __device__(size_type row_index) {
return resolver(row_index == 0 || !comparator(row_index, row_index - 1),
row_index);
});

thrust::inclusive_scan(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
mutable_ranks.begin<size_type>(),
scan_op);
return ranks;
}

} // namespace

std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in dense_rank scan.");
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}
return rank_generator<false>(
order_by,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}

std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in rank scan.");
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}
return rank_generator<false>(
order_by,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}

std::unique_ptr<column> scan_inclusive(
column_view const& input,
std::unique_ptr<aggregation> const& agg,
Expand All @@ -301,9 +203,7 @@ std::unique_ptr<column> scan_inclusive(
{
auto output = scan_agg_dispatch<scan_dispatcher>(input, agg, null_handling, stream, mr);

if (agg->kind == aggregation::RANK || agg->kind == aggregation::DENSE_RANK) {
return output;
} else if (null_handling == null_policy::EXCLUDE) {
if (null_handling == null_policy::EXCLUDE) {
output->set_null_mask(detail::copy_bitmask(input, stream, mr), input.null_count());
} else if (input.nullable()) {
output->set_null_mask(mask_scan(input, scan_type::INCLUSIVE, stream, mr), UNKNOWN_NULL_COUNT);
Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/reductions/scan_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ TEST(RankScanTest, ExclusiveScan)

CUDF_EXPECT_THROW_MESSAGE(
scan(vals, make_dense_rank_aggregation(), scan_type::EXCLUSIVE, null_policy::INCLUDE),
"Unsupported rank aggregation operator for exclusive scan");
"Unsupported dense rank aggregation operator for exclusive scan");
CUDF_EXPECT_THROW_MESSAGE(
scan(vals, make_rank_aggregation(), scan_type::EXCLUSIVE, null_policy::INCLUDE),
"Unsupported rank aggregation operator for exclusive scan");
Expand Down

0 comments on commit 122da20

Please sign in to comment.