Skip to content

Commit

Permalink
Merge branch 'branch-22.02' into uppercase-inf-nan
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt committed Nov 30, 2021
2 parents d125430 + 27b7190 commit 0c21f03
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 45 deletions.
71 changes: 37 additions & 34 deletions cpp/include/cudf/detail/scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,25 @@ namespace detail {
/**
* @brief Computes the exclusive scan of a column.
*
* The null values are skipped for the operation, and if an input element
* at `i` is null, then the output element at `i` will also be null.
* The null values are skipped for the operation, and if an input element at `i` is null, then the
* output element at `i` will also be null.
*
* The identity value for the column type as per the aggregation type
* is used for the value of the first element in the output column.
* The identity value for the column type as per the aggregation type is used for the value of the
* first element in the output column.
*
* @throws cudf::logic_error if column data_type is not an arithmetic type.
* Struct columns are allowed with aggregation types Min and Max.
*
* @param input The input column view for the scan
* @param agg unique_ptr to aggregation operator applied by the scan
* @param null_handling Exclude null values when computing the result if
* null_policy::EXCLUDE. Include nulls if null_policy::INCLUDE.
* Any operation with a null results in a null.
* @throws cudf::logic_error if column data_type is not an arithmetic type or struct type but the
* `agg` is not Min or Max.
*
* @param input The input column view for the scan.
* @param agg unique_ptr to aggregation operator applied by the scan.
* @param null_handling Exclude null values when computing the result if null_policy::EXCLUDE.
* Include nulls if null_policy::INCLUDE. Any operation with a null results in
* a null.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Column with scan results
* @param mr Device memory resource used to allocate the returned scalar's device memory.
* @returns Column with scan results.
*/
std::unique_ptr<column> scan_exclusive(column_view const& input,
std::unique_ptr<aggregation> const& agg,
Expand All @@ -52,22 +55,22 @@ std::unique_ptr<column> scan_exclusive(column_view const& input,
/**
* @brief Computes the inclusive scan of a column.
*
* The null values are skipped for the operation, and if an input element
* at `i` is null, then the output element at `i` will also be null.
* The null values are skipped for the operation, and if an input element at `i` is null, then the
* output element at `i` will also be null.
*
* String columns are allowed with aggregation types Min and Max.
* String and struct columns are allowed with aggregation types Min and Max.
*
* @throws cudf::logic_error if column data_type is not an arithmetic type
* or string type but the `agg` is not Min or Max
* @throws cudf::logic_error if column data_type is not an arithmetic type or string/struct types
* but the `agg` is not Min or Max.
*
* @param input The input column view for the scan
* @param agg unique_ptr to aggregation operator applied by the scan
* @param null_handling Exclude null values when computing the result if
* null_policy::EXCLUDE. Include nulls if null_policy::INCLUDE.
* Any operation with a null results in a null.
* @param input The input column view for the scan.
* @param agg unique_ptr to aggregation operator applied by the scan.
* @param null_handling Exclude null values when computing the result if null_policy::EXCLUDE.
* Include nulls if null_policy::INCLUDE. Any operation with a null results in
* a null.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Column with scan results
* @param mr Device memory resource used to allocate the returned scalar's device memory.
* @returns Column with scan results.
*/
std::unique_ptr<column> scan_inclusive(column_view const& input,
std::unique_ptr<aggregation> const& agg,
Expand All @@ -76,24 +79,24 @@ std::unique_ptr<column> scan_inclusive(column_view const& input,
rmm::mr::device_memory_resource* mr);

/**
* @brief Generate row ranks for a column
* @brief Generate row ranks for a column.
*
* @param order_by Input column to generate ranks for
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return rank values
* @param order_by Input column to generate ranks for.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return rank values.
*/
std::unique_ptr<column> inclusive_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Generate row dense ranks for a column
* @brief Generate row dense ranks for a column.
*
* @param order_by Input column to generate ranks for
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return rank values
* @param order_by Input column to generate ranks for.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return rank values.
*/
std::unique_ptr<column> inclusive_dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/groupby/sort/group_scan_util.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,13 @@ struct group_scan_functor<K,
auto gather_map_view =
column_view(data_type{type_to_id<offset_type>()}, gather_map.size(), gather_map.data());

//
// Gather the children elements of the prefix min/max struct elements first.
//
// Typically, we should use `get_sliced_child` for each child column to properly handle the
// input if it is a sliced view. However, since the input to this function is just generated
// from groupby internal APIs which is never a sliced view, we just use `child_begin` and
// `child_end` iterators for simplicity.
auto scanned_children =
cudf::detail::gather(
table_view(std::vector<column_view>{values.child_begin(), values.child_end()}),
Expand Down
24 changes: 16 additions & 8 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1496,15 +1496,23 @@ orc_table_view make_orc_table_view(table_view const& table,
append_orc_column(col.child(lists_column_view::child_column_index),
&orc_columns[new_col_idx],
col_meta.child(lists_column_view::child_column_index));
} else if (kind == TypeKind::STRUCT or kind == TypeKind::MAP) {
// MAP: skip to the list child - include grandchildren columns instead of children
auto const real_parent_col =
kind == TypeKind::MAP ? col.child(lists_column_view::child_column_index) : col;
for (auto child_idx = 0; child_idx != real_parent_col.num_children(); ++child_idx) {
append_orc_column(real_parent_col.child(child_idx),
&orc_columns[new_col_idx],
col_meta.child(child_idx));
} else if (kind == TypeKind::STRUCT) {
for (auto child_idx = 0; child_idx != col.num_children(); ++child_idx) {
append_orc_column(
col.child(child_idx), &orc_columns[new_col_idx], col_meta.child(child_idx));
}
} else if (kind == TypeKind::MAP) {
// MAP: skip to the list child - include grandchildren columns instead of children
auto const real_parent_col = col.child(lists_column_view::child_column_index);
auto const& real_parent_meta = col_meta.child(lists_column_view::child_column_index);
CUDF_EXPECTS(real_parent_meta.num_children() == 2,
"Map struct column should have exactly two children");
// process MAP key
append_orc_column(
real_parent_col.child(0), &orc_columns[new_col_idx], real_parent_meta.child(0));
// process MAP value
append_orc_column(
real_parent_col.child(1), &orc_columns[new_col_idx], real_parent_meta.child(1));
}
};

Expand Down
87 changes: 85 additions & 2 deletions cpp/src/reductions/scan/scan_inclusive.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
* limitations under the License.
*/

#include "scan.cuh"
#include <reductions/arg_minmax_util.cuh>
#include <reductions/scan/scan.cuh>

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/structs/utilities.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/reduction.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -150,6 +154,72 @@ struct scan_functor<Op, cudf::string_view> {
}
};

template <typename Op>
struct scan_functor<Op, cudf::struct_view> {
static std::unique_ptr<column> invoke(column_view const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Op is used only to determined if we want to find the min or max element.
auto constexpr is_min_op = std::is_same_v<Op, DeviceMin>;

// Build indices of the scan operation results (ARGMIN/ARGMAX).
// When finding ARGMIN, we need to consider nulls as larger than non-null elements, and the
// opposite for ARGMAX.
auto gather_map = rmm::device_uvector<size_type>(input.size(), stream);
auto const do_scan = [&](auto const& binop) {
thrust::inclusive_scan(rmm::exec_policy(stream),
thrust::counting_iterator<size_type>(0),
thrust::counting_iterator<size_type>(input.size()),
gather_map.begin(),
binop);
};

auto constexpr null_precedence = is_min_op ? cudf::null_order::AFTER : cudf::null_order::BEFORE;
auto const flattened_input = cudf::structs::detail::flatten_nested_columns(
table_view{{input}}, {}, std::vector<null_order>{null_precedence});
auto const d_flattened_input_ptr = table_device_view::create(flattened_input, stream);
auto const flattened_null_precedences =
is_min_op ? cudf::detail::make_device_uvector_async(flattened_input.null_orders(), stream)
: rmm::device_uvector<cudf::null_order>(0, stream);

if (input.has_nulls()) {
auto const binop = cudf::reduction::detail::row_arg_minmax_fn<true>(
input.size(), *d_flattened_input_ptr, flattened_null_precedences.data(), is_min_op);
do_scan(binop);
} else {
auto const binop = cudf::reduction::detail::row_arg_minmax_fn<false>(
input.size(), *d_flattened_input_ptr, flattened_null_precedences.data(), is_min_op);
do_scan(binop);
}

// Gather the children columns of the input column. Must use `get_sliced_child` to properly
// handle input in case it is a sliced view.
auto const input_children = [&] {
auto const it = cudf::detail::make_counting_transform_iterator(
0, [structs_view = structs_column_view{input}, stream](auto const child_idx) {
return structs_view.get_sliced_child(child_idx);
});
return std::vector<column_view>(it, it + input.num_children());
}();

// Gather the children elements of the prefix min/max struct elements for the output.
auto scanned_children = cudf::detail::gather(table_view{input_children},
gather_map,
out_of_bounds_policy::DONT_CHECK,
negative_index_policy::NOT_ALLOWED,
stream,
mr)
->release();

// Don't need to set a null mask because that will be handled at the caller.
return make_structs_column(input.size(),
std::move(scanned_children),
UNKNOWN_NULL_COUNT,
rmm::device_buffer{0, stream, mr});
}
};

/**
* @brief Dispatcher for running a Scan operation on an input column
*
Expand All @@ -161,7 +231,11 @@ struct scan_dispatcher {
template <typename T>
static constexpr bool is_supported()
{
return std::is_invocable_v<Op, T, T> && !cudf::is_dictionary<T>();
if constexpr (std::is_same_v<T, cudf::struct_view>) {
return std::is_same_v<Op, DeviceMin> || std::is_same_v<Op, DeviceMax>;
} else {
return std::is_invocable_v<Op, T, T> && !cudf::is_dictionary<T>();
}
}

public:
Expand Down Expand Up @@ -209,6 +283,15 @@ std::unique_ptr<column> scan_inclusive(
output->set_null_mask(mask_scan(input, scan_type::INCLUSIVE, stream, mr), UNKNOWN_NULL_COUNT);
}

// If the input is a structs column, we also need to push down nulls from the parent output column
// into the children columns.
if (input.type().id() == type_id::STRUCT && output->has_nulls()) {
for (size_type idx = 0; idx < output->num_children(); ++idx) {
structs::detail::superimpose_parent_nulls(
output->view().null_mask(), output->null_count(), output->child(idx), stream, mr);
}
}

return output;
}
} // namespace detail
Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/reductions/reduction_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/iterator_utilities.hpp>
#include <cudf_test/table_utilities.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/copying.hpp>
Expand All @@ -28,7 +29,6 @@
#include <cudf/scalar/scalar.hpp>
#include <cudf/types.hpp>
#include <cudf/wrappers/timestamps.hpp>
#include <cudf_test/table_utilities.hpp>

#include <thrust/iterator/counting_iterator.h>

Expand Down
Loading

0 comments on commit 0c21f03

Please sign in to comment.