Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support collect aggregations in reduction #10353

Merged
merged 4 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ add_library(
src/quantiles/quantiles.cu
src/reductions/all.cu
src/reductions/any.cu
src/reductions/collect_ops.cu
src/reductions/max.cu
src/reductions/mean.cu
src/reductions/min.cu
Expand Down
67 changes: 66 additions & 1 deletion cpp/include/cudf/detail/reduction_functions.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
#include <cudf/column/column_view.hpp>
#include <cudf/scalar/scalar.hpp>

#include "cudf/lists/lists_column_view.hpp"
#include <rmm/cuda_stream_view.hpp>

namespace cudf {
Expand Down Expand Up @@ -254,5 +255,69 @@ std::unique_ptr<scalar> nth_element(
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Collect input column into a (list) scalar
*
* @param col input column to collect from
* @param null_handling Indicates if null values will be counted while collecting.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return collected list as scalar
*/
std::unique_ptr<scalar> collect_list(
column_view const& col,
null_policy null_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Merge a bunch of list scalars into single list scalar
*
* @param col input list column representing numbers of list scalars to be merged
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return merged list as scalar
*/
std::unique_ptr<scalar> merge_lists(
lists_column_view const& col,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Collect input column into a (list) scalar without duplicated elements
*
* @param col input column to collect from
* @param null_handling Indicates if null values will be counted while collecting.
* @param nulls_equal Indicates if null values will be considered as equal values.
* @param nans_equal Indicates if nan values will be considered as equal values.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return collected list with unique elements as scalar
*/
std::unique_ptr<scalar> collect_set(
column_view const& col,
null_policy null_handling,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Merge a bunch of list scalars into single list scalar then drop duplicated elements
*
* @param col input list column representing numbers of list scalars to be merged
* @param nulls_equal Indicates if null values will be considered as equal values.
* @param nans_equal Indicates if nan values will be considered as equal values.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @return collected list with unique elements as scalar
*/
std::unique_ptr<scalar> merge_sets(
lists_column_view const& col,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace reduction
} // namespace cudf
92 changes: 92 additions & 0 deletions cpp/src/reductions/collect_ops.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/column/column_view.hpp>
#include <cudf/detail/copy_if.cuh>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/reduction_functions.hpp>
#include <cudf/lists/drop_list_duplicates.hpp>
#include <cudf/lists/lists_column_factories.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/scalar/scalar.hpp>
#include <cudf/scalar/scalar_factories.hpp>

namespace cudf {
namespace reduction {

std::unique_ptr<scalar> drop_duplicates(list_scalar const& scalar,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto list_wrapper = lists::detail::make_lists_column_from_scalar(scalar, 1, stream, mr);
auto lcw = lists_column_view(list_wrapper->view());
auto no_dup_wrapper = lists::drop_list_duplicates(lcw, nulls_equal, nans_equal, mr);
auto no_dup = lists_column_view(no_dup_wrapper->view()).get_sliced_child(stream);
return make_list_scalar(no_dup, stream, mr);
}

std::unique_ptr<scalar> collect_list(column_view const& col,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (null_handling == null_policy::EXCLUDE && col.has_nulls()) {
auto d_view = column_device_view::create(col, stream);
auto filter = detail::validity_accessor(*d_view);
auto null_purged_table = detail::copy_if(table_view{{col}}, filter, stream, mr);
column* null_purged_col = null_purged_table->release().front().release();
null_purged_col->set_null_mask(rmm::device_buffer{0, stream, mr}, 0);
return std::make_unique<list_scalar>(std::move(*null_purged_col), true, stream, mr);
} else {
return make_list_scalar(col, stream, mr);
}
}

std::unique_ptr<scalar> merge_lists(lists_column_view const& col,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto flatten_col = col.get_sliced_child(stream);
return make_list_scalar(flatten_col, stream, mr);
}

std::unique_ptr<scalar> collect_set(column_view const& col,
null_policy null_handling,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto scalar = collect_list(col, null_handling, stream, mr);
auto ls = dynamic_cast<list_scalar*>(scalar.get());
return drop_duplicates(*ls, nulls_equal, nans_equal, stream, mr);
}

std::unique_ptr<scalar> merge_sets(lists_column_view const& col,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto flatten_col = col.get_sliced_child(stream);
auto scalar = std::make_unique<list_scalar>(flatten_col, true, stream, mr);
return drop_duplicates(*scalar, nulls_equal, nans_equal, stream, mr);
}

} // namespace reduction
} // namespace cudf
16 changes: 16 additions & 0 deletions cpp/src/reductions/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,22 @@ struct reduce_dispatch_functor {
auto nth_agg = dynamic_cast<nth_element_aggregation const*>(agg.get());
return reduction::nth_element(col, nth_agg->_n, nth_agg->_null_handling, stream, mr);
} break;
case aggregation::COLLECT_LIST: {
auto col_agg = dynamic_cast<collect_list_aggregation const*>(agg.get());
return reduction::collect_list(col, col_agg->_null_handling, stream, mr);
} break;
case aggregation::COLLECT_SET: {
auto col_agg = dynamic_cast<collect_set_aggregation const*>(agg.get());
return reduction::collect_set(
col, col_agg->_null_handling, col_agg->_nulls_equal, col_agg->_nans_equal, stream, mr);
} break;
case aggregation::MERGE_LISTS: {
return reduction::merge_lists(col, stream, mr);
} break;
case aggregation::MERGE_SETS: {
auto col_agg = dynamic_cast<merge_sets_aggregation const*>(agg.get());
return reduction::merge_sets(col, col_agg->_nulls_equal, col_agg->_nans_equal, stream, mr);
} break;
default: CUDF_FAIL("Unsupported reduction operator");
}
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ ConfigureTest(
# ##################################################################################################
# * reduction tests -------------------------------------------------------------------------------
ConfigureTest(
REDUCTION_TEST reductions/rank_tests.cpp reductions/reduction_tests.cpp reductions/scan_tests.cpp
REDUCTION_TEST reductions/collect_ops_tests.cpp reductions/rank_tests.cpp
reductions/reduction_tests.cpp reductions/scan_tests.cpp
)

# ##################################################################################################
Expand Down
Loading