Skip to content

Commit

Permalink
Added null_policy handling to segmented reductions
Browse files Browse the repository at this point in the history
Co-authored-by: Bradley Dice <[email protected]>
  • Loading branch information
isVoid and bdice committed Dec 14, 2021
1 parent 5c4ddb7 commit 39db304
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 77 deletions.
5 changes: 2 additions & 3 deletions cpp/include/cudf/detail/reduction.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,8 @@ std::unique_ptr<column> segmented_reduce(InputIterator d_in,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto binary_op = sop.get_binary_op();
auto identity = sop.template get_identity<OutputType>();
// auto dev_result = rmm::device_scalar<OutputType>{identity, stream, mr};
auto binary_op = sop.get_binary_op();
auto identity = sop.template get_identity<OutputType>();
auto dev_result = make_fixed_width_column(
data_type{type_to_id<OutputType>()}, num_segments, mask_state::UNALLOCATED, stream, mr);
auto dev_result_mview = dev_result->mutable_view();
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/cudf/detail/reduction_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ std::unique_ptr<column> segmented_sum(
column_view const& col,
column_view const& offsets,
data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand All @@ -294,6 +295,7 @@ std::unique_ptr<column> segmented_product(
column_view const& col,
column_view const& offsets,
data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand All @@ -315,6 +317,7 @@ std::unique_ptr<column> segmented_min(
column_view const& col,
column_view const& offsets,
data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand All @@ -336,6 +339,7 @@ std::unique_ptr<column> segmented_max(
column_view const& col,
column_view const& offsets,
data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand All @@ -358,6 +362,7 @@ std::unique_ptr<column> segmented_any(
column_view const& col,
column_view const& offsets,
data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand All @@ -380,6 +385,7 @@ std::unique_ptr<column> segmented_all(
column_view const& col,
column_view const& offsets,
data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/valid_if.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ __global__ void valid_if_kernel(

size_type block_count = single_lane_block_sum_reduce<block_size, leader_lane>(warp_valid_count);
if (threadIdx.x == 0) { atomicAdd(valid_count, block_count); }
} // namespace detail
}

/**
* @brief Generate a bitmask where every bit is set for which a predicate is
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/cudf/reduction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ std::unique_ptr<scalar> reduce(
* @param agg Aggregation operator applied by the reduction
* @param offsets Indices to segment boundaries
* @param output_dtype The computation and output precision.
* @param null_handling `INCLUDE`
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output column with segment's reduce result.
*/
Expand All @@ -111,6 +112,7 @@ std::unique_ptr<column> segmented_reduce(
column_view const& offsets,
std::unique_ptr<aggregation> const& agg,
data_type output_dtype,
null_policy null_handling,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/reductions/segmented_all.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "cudf/types.hpp"
#include <cudf/detail/reduction_functions.hpp>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <reductions/simple_segmented.cuh>
Expand All @@ -24,19 +25,22 @@ namespace reduction {
std::unique_ptr<cudf::column> segmented_all(column_view const& col,
column_view const& offsets,
cudf::data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(output_dtype == cudf::data_type(cudf::type_id::BOOL8),
"all() operation can be applied with output type `BOOL8` only");

// dispatch for non-dictionary types
return cudf::type_dispatcher(col.type(),
simple::bool_result_column_dispatcher<cudf::reduction::op::min>{},
col,
offsets,
stream,
mr);
return cudf::type_dispatcher(
col.type(),
simple::detail::bool_result_column_dispatcher<cudf::reduction::op::min>{},
col,
offsets,
null_handling,
stream,
mr);
}

} // namespace reduction
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/reductions/segmented_any.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "cudf/types.hpp"
#include <cudf/detail/reduction_functions.hpp>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <reductions/simple_segmented.cuh>
Expand All @@ -24,19 +25,22 @@ namespace reduction {
std::unique_ptr<cudf::column> segmented_any(column_view const& col,
column_view const& offsets,
cudf::data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(output_dtype == cudf::data_type(cudf::type_id::BOOL8),
"segmented_any() operation can be applied with output type `bool8` only");

// dispatch for non-dictionary types
return cudf::type_dispatcher(col.type(),
simple::bool_result_column_dispatcher<cudf::reduction::op::max>{},
col,
offsets,
stream,
mr);
return cudf::type_dispatcher(
col.type(),
simple::detail::bool_result_column_dispatcher<cudf::reduction::op::max>{},
col,
offsets,
null_handling,
stream,
mr);
}

} // namespace reduction
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/reductions/segmented_max.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cudf/detail/reduction_functions.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/types.hpp>
#include <reductions/simple_segmented.cuh>

#include <rmm/cuda_stream_view.hpp>
Expand All @@ -26,16 +27,19 @@ namespace reduction {
std::unique_ptr<cudf::column> segmented_max(column_view const& col,
column_view const& offsets,
cudf::data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(col.type() == output_dtype, "max() operation requires matching output type");
return cudf::type_dispatcher(col.type(),
simple::same_column_type_dispatcher<cudf::reduction::op::max>{},
col,
offsets,
stream,
mr);
return cudf::type_dispatcher(
col.type(),
simple::detail::same_column_type_dispatcher<cudf::reduction::op::max>{},
col,
offsets,
null_handling,
stream,
mr);
}

} // namespace reduction
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/reductions/segmented_min.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cudf/detail/reduction_functions.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/types.hpp>
#include <reductions/simple_segmented.cuh>

namespace cudf {
Expand All @@ -24,16 +25,19 @@ namespace reduction {
std::unique_ptr<cudf::column> segmented_min(column_view const& col,
column_view const& offsets,
data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(col.type() == output_dtype, "min() operation requires matching output type");
return cudf::type_dispatcher(col.type(),
simple::same_column_type_dispatcher<cudf::reduction::op::min>{},
col,
offsets,
stream,
mr);
return cudf::type_dispatcher(
col.type(),
simple::detail::same_column_type_dispatcher<cudf::reduction::op::min>{},
col,
offsets,
null_handling,
stream,
mr);
}

} // namespace reduction
Expand Down
18 changes: 11 additions & 7 deletions cpp/src/reductions/segmented_product.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cudf/detail/reduction_functions.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/types.hpp>
#include <reductions/simple_segmented.cuh>

#include <rmm/cuda_stream_view.hpp>
Expand All @@ -26,16 +27,19 @@ namespace reduction {
std::unique_ptr<cudf::column> segmented_product(column_view const& col,
column_view const& offsets,
cudf::data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return cudf::type_dispatcher(col.type(),
simple::column_type_dispatcher<cudf::reduction::op::product>{},
col,
offsets,
output_dtype,
stream,
mr);
return cudf::type_dispatcher(
col.type(),
simple::detail::column_type_dispatcher<cudf::reduction::op::product>{},
col,
offsets,
output_dtype,
null_handling,
stream,
mr);
}

} // namespace reduction
Expand Down
31 changes: 22 additions & 9 deletions cpp/src/reductions/segmented_reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "cudf/types.hpp"
#include <cudf/column/column.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/copy.hpp>
Expand All @@ -34,15 +35,22 @@ struct segmented_reduce_dispatch_functor {
column_view const col;
column_view const offsets;
data_type output_dtype;
null_policy null_handling;
rmm::mr::device_memory_resource* mr;
rmm::cuda_stream_view stream;

segmented_reduce_dispatch_functor(column_view const& col,
column_view const& offsets,
data_type output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: col(col), offsets(offsets), output_dtype(output_dtype), mr(mr), stream(stream)
: col(col),
offsets(offsets),
output_dtype(output_dtype),
null_handling(null_handling),
mr(mr),
stream(stream)
{
}

Expand All @@ -51,22 +59,22 @@ struct segmented_reduce_dispatch_functor {
{
switch (k) {
case aggregation::SUM:
return reduction::segmented_sum(col, offsets, output_dtype, stream, mr);
return reduction::segmented_sum(col, offsets, output_dtype, null_handling, stream, mr);
break;
case aggregation::PRODUCT:
return reduction::segmented_product(col, offsets, output_dtype, stream, mr);
return reduction::segmented_product(col, offsets, output_dtype, null_handling, stream, mr);
break;
case aggregation::MIN:
return reduction::segmented_min(col, offsets, output_dtype, stream, mr);
return reduction::segmented_min(col, offsets, output_dtype, null_handling, stream, mr);
break;
case aggregation::MAX:
return reduction::segmented_max(col, offsets, output_dtype, stream, mr);
return reduction::segmented_max(col, offsets, output_dtype, null_handling, stream, mr);
break;
case aggregation::ANY:
return reduction::segmented_any(col, offsets, output_dtype, stream, mr);
return reduction::segmented_any(col, offsets, output_dtype, null_handling, stream, mr);
break;
case aggregation::ALL:
return reduction::segmented_all(col, offsets, output_dtype, stream, mr);
return reduction::segmented_all(col, offsets, output_dtype, null_handling, stream, mr);
break;
default: CUDF_FAIL("Unsupported aggregation type.");
}
Expand All @@ -78,24 +86,29 @@ std::unique_ptr<column> segmented_reduce(
column_view const& offsets,
std::unique_ptr<aggregation> const& agg,
data_type output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
{
// TODO: handle invalid inputs.

return aggregation_dispatcher(
agg->kind, segmented_reduce_dispatch_functor{col, offsets, output_dtype, stream, mr}, agg);
agg->kind,
segmented_reduce_dispatch_functor{col, offsets, output_dtype, null_handling, stream, mr},
agg);
}
} // namespace detail

std::unique_ptr<column> segmented_reduce(column_view const& col,
column_view const& offsets,
std::unique_ptr<aggregation> const& agg,
data_type output_dtype,
null_policy null_handling,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::segmented_reduce(col, offsets, agg, output_dtype, rmm::cuda_stream_default, mr);
return detail::segmented_reduce(
col, offsets, agg, output_dtype, null_handling, rmm::cuda_stream_default, mr);
}

} // namespace cudf
4 changes: 3 additions & 1 deletion cpp/src/reductions/segmented_sum.cu
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ namespace reduction {
std::unique_ptr<cudf::column> segmented_sum(column_view const& col,
column_view const& offsets,
cudf::data_type const output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return cudf::type_dispatcher(col.type(),
simple::column_type_dispatcher<cudf::reduction::op::sum>{},
simple::detail::column_type_dispatcher<cudf::reduction::op::sum>{},
col,
offsets,
output_dtype,
null_handling,
stream,
mr);
}
Expand Down
Loading

0 comments on commit 39db304

Please sign in to comment.