From 1448955f3accef8574a2baeab85323452c56f2a6 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Fri, 23 Apr 2021 15:37:58 +0800 Subject: [PATCH 1/6] support null-excluded policy for collect aggregations in groupby --- cpp/src/groupby/sort/aggregate.cpp | 20 ++++--- cpp/src/groupby/sort/group_collect.cu | 72 +++++++++++++++++++++-- cpp/src/groupby/sort/group_reductions.hpp | 3 + cpp/tests/groupby/collect_set_test.cpp | 20 +++---- cpp/tests/groupby/group_collect_test.cpp | 59 +++++++++++++------ 5 files changed, 132 insertions(+), 42 deletions(-) diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index 12f157cd3d9..cb65808f285 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -367,13 +367,15 @@ void aggregrate_result_functor::operator()(aggregatio { auto null_handling = static_cast(agg)._null_handling; - CUDF_EXPECTS(null_handling == null_policy::INCLUDE, - "null exclusion is not supported on groupby COLLECT_LIST aggregation."); if (cache.has_result(col_idx, agg)) return; - auto result = detail::group_collect( - get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr); + auto result = detail::group_collect(get_grouped_values(), + helper.group_offsets(stream), + helper.num_groups(stream), + null_handling, + stream, + mr); cache.add_result(col_idx, agg, std::move(result)); }; @@ -383,13 +385,15 @@ void aggregrate_result_functor::operator()(aggregation { auto const null_handling = static_cast(agg)._null_handling; - CUDF_EXPECTS(null_handling == null_policy::INCLUDE, - "null exclusion is not supported on groupby COLLECT_SET aggregation."); if (cache.has_result(col_idx, agg)) { return; } - auto const collect_result = detail::group_collect( - get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr); + auto const collect_result = detail::group_collect(get_grouped_values(), + helper.group_offsets(stream), + helper.num_groups(stream), + null_handling, + stream, + mr); auto const nulls_equal = static_cast(agg)._nulls_equal; auto const nans_equal = diff --git a/cpp/src/groupby/sort/group_collect.cu b/cpp/src/groupby/sort/group_collect.cu index b7bcd05a72a..fde6d2f63ad 100644 --- a/cpp/src/groupby/sort/group_collect.cu +++ b/cpp/src/groupby/sort/group_collect.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,30 +17,94 @@ #include #include #include +#include #include +#include #include #include #include +#include + namespace cudf { namespace groupby { namespace detail { +/** + * @brief Purge null entries in grouped values, and adjust group offsets. + */ +std::pair, std::unique_ptr> purge_null_entries( + column_view const &values, + column_view const &offsets, + size_type num_groups, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource *mr) +{ + auto values_device_view = column_device_view::create(values, stream); + + auto not_null_pred = [d_value = *values_device_view] __device__(auto i) { + return d_value.is_valid_nocheck(i); + }; + + // Purge null entries in grouped values. + auto null_purged_entries = + cudf::detail::copy_if(table_view{{values}}, not_null_pred, stream, mr)->release(); + + std::unique_ptr &null_purged_values = null_purged_entries[0]; + + // Recalculate offsets after null entries are purged. + auto null_purged_sizes = make_fixed_width_column( + data_type{type_to_id()}, num_groups, mask_state::UNALLOCATED, stream, mr); + + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_groups), + null_purged_sizes->mutable_view().template begin(), + [d_offsets = offsets.template begin(), not_null_pred] __device__(auto i) { + return thrust::count_if(thrust::seq, + thrust::make_counting_iterator(d_offsets[i]), + thrust::make_counting_iterator(d_offsets[i + 1]), + not_null_pred); + }); + + auto null_purged_offsets = strings::detail::make_offsets_child_column( + null_purged_sizes->view().template begin(), + null_purged_sizes->view().template end(), + stream, + mr); + + return std::make_pair, std::unique_ptr>( + std::move(null_purged_values), std::move(null_purged_offsets)); +} + std::unique_ptr group_collect(column_view const &values, cudf::device_span group_offsets, size_type num_groups, + null_policy null_handling, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { rmm::device_buffer offsets_data( group_offsets.data(), group_offsets.size() * sizeof(cudf::size_type), stream, mr); - auto offsets = std::make_unique( + auto offsets_column = std::make_unique( cudf::data_type(cudf::type_to_id()), num_groups + 1, std::move(offsets_data)); + std::unique_ptr child_column; + + // If column of grouped values contains null elements, and null_policy == EXCLUDE, + // those elements must be filtered out, and offsets recomputed. + if (null_handling == null_policy::EXCLUDE && values.has_nulls()) { + std::tie(child_column, offsets_column) = cudf::groupby::detail::purge_null_entries( + values, offsets_column->view(), num_groups, stream, mr); + } else { + child_column = std::make_unique(values, stream, mr); + } + return make_lists_column(num_groups, - std::move(offsets), - std::make_unique(values, stream, mr), + std::move(offsets_column), + std::move(child_column), 0, rmm::device_buffer{0, stream, mr}, stream, diff --git a/cpp/src/groupby/sort/group_reductions.hpp b/cpp/src/groupby/sort/group_reductions.hpp index 71980082156..9e3269f4c0c 100644 --- a/cpp/src/groupby/sort/group_reductions.hpp +++ b/cpp/src/groupby/sort/group_reductions.hpp @@ -357,12 +357,15 @@ std::unique_ptr group_nth_element(column_view const& values, * @param values Grouped values to collect * @param group_offsets Offsets of groups' starting points within @p values * @param num_groups Number of groups + * @param null_handling Exclude nulls while counting if null_policy::EXCLUDE, + * Include nulls if null_policy::INCLUDE. * @param mr Device memory resource used to allocate the returned column's device memory * @param stream CUDA stream used for device memory operations and kernel launches. */ std::unique_ptr group_collect(column_view const& values, cudf::device_span group_offsets, size_type num_groups, + null_policy null_handling, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); diff --git a/cpp/tests/groupby/collect_set_test.cpp b/cpp/tests/groupby/collect_set_test.cpp index 5303b8f4f61..9ae1def8136 100644 --- a/cpp/tests/groupby/collect_set_test.cpp +++ b/cpp/tests/groupby/collect_set_test.cpp @@ -34,6 +34,7 @@ namespace test { #define COLLECT_SET cudf::make_collect_set_aggregation() #define COLLECT_SET_NULL_UNEQUAL \ cudf::make_collect_set_aggregation(null_policy::INCLUDE, null_equality::UNEQUAL) +#define COLLECT_SET_NULL_EXCLUDE cudf::make_collect_set_aggregation(null_policy::EXCLUDE) struct CollectSetTest : public cudf::test::BaseFixture { }; @@ -47,17 +48,6 @@ using FixedWidthTypesNotBool = cudf::test::Concat; TYPED_TEST_CASE(CollectSetTypedTest, FixedWidthTypesNotBool); -TYPED_TEST(CollectSetTypedTest, ExceptionTests) -{ - std::vector agg_requests(1); - agg_requests[0].values = COL_V{{1, 2, 3, 4, 5, 6}, {true, false, true, false, true, false}}; - agg_requests[0].aggregations.push_back(cudf::make_collect_list_aggregation(null_policy::EXCLUDE)); - - // groupby cannot exclude nulls - groupby::groupby gby{table_view{{COL_K{1, 1, 2, 2, 3, 3}}}}; - EXPECT_THROW(gby.aggregate(agg_requests), cudf::logic_error); -} - TYPED_TEST(CollectSetTypedTest, TrivialInput) { // Empty input @@ -174,6 +164,10 @@ TYPED_TEST(CollectSetTypedTest, CollectWithNulls) {{20, null, null, null}, VALIDITY{true, false, false, false}}, {{30, 31}, VALIDITY{true, true}}}; test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_UNEQUAL); + + // All nulls per key are excluded + vals_expected = LCL_V{{10}, {20}, {30, 31}}; + test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_EXCLUDE); } // Expect the result keys to be sorted by sort-based groupby @@ -196,6 +190,10 @@ TYPED_TEST(CollectSetTypedTest, CollectWithNulls) {{null, null, null, null}, VALIDITY{false, false, false, false}}, {{40}, VALIDITY{true}}}; test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_UNEQUAL); + + // All nulls per key are excluded + vals_expected = LCL_V{{10}, {20, 21}, {}, {40}}; + test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_EXCLUDE); } } diff --git a/cpp/tests/groupby/group_collect_test.cpp b/cpp/tests/groupby/group_collect_test.cpp index 8a578ea0c0f..c38ae631b0a 100644 --- a/cpp/tests/groupby/group_collect_test.cpp +++ b/cpp/tests/groupby/group_collect_test.cpp @@ -68,6 +68,24 @@ TYPED_TEST(groupby_collect_list_test, CollectWithNulls) test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg)); } +TYPED_TEST(groupby_collect_list_test, CollectWithNullExclusion) +{ + using K = int32_t; + using V = TypeParam; + + fixed_width_column_wrapper keys{1, 1, 1, 2, 2, 3, 3, 4, 4}; + + fixed_width_column_wrapper values{ + {1, 2, 3, 4, 5, 6, 7, 8, 9}, {false, true, false, true, false, false, false, true, true}}; + + fixed_width_column_wrapper expect_keys{1, 2, 3, 4}; + + lists_column_wrapper expect_vals{{2}, {4}, {}, {8, 9}}; + + auto agg = cudf::make_collect_list_aggregation(null_policy::EXCLUDE); + test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg)); +} + TYPED_TEST(groupby_collect_list_test, CollectLists) { using K = int32_t; @@ -87,6 +105,28 @@ TYPED_TEST(groupby_collect_list_test, CollectLists) test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg)); } +TYPED_TEST(groupby_collect_list_test, CollectListsWithNullExclusion) +{ + using K = int32_t; + using V = TypeParam; + + using LCW = cudf::test::lists_column_wrapper; + + fixed_width_column_wrapper keys{1, 1, 2, 2, 3, 3, 4, 4}; + const bool validity_mask[8] = {true, false, false, true, true, true, false, false}; + auto validity = cudf::detail::make_counting_transform_iterator( + 0, [&validity_mask](auto i) { return validity_mask[i]; }); + lists_column_wrapper values{ + {{1, 2}, {3, 4}, {5, 6, 7}, LCW{}, {9, 10}, {11}, {20, 30, 40}, LCW{}}, validity}; + + fixed_width_column_wrapper expect_keys{1, 2, 3, 4}; + + lists_column_wrapper expect_vals{{{1, 2}}, {LCW{}}, {{9, 10}, {11}}, {}}; + + auto agg = cudf::make_collect_list_aggregation(null_policy::EXCLUDE); + test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg)); +} + TYPED_TEST(groupby_collect_list_test, dictionary) { using K = int32_t; @@ -109,24 +149,5 @@ TYPED_TEST(groupby_collect_list_test, dictionary) keys, vals, expect_keys, expect_vals->view(), cudf::make_collect_list_aggregation()); } -TYPED_TEST(groupby_collect_list_test, CollectFailsWithNullExclusion) -{ - using K = int32_t; - using V = TypeParam; - - fixed_width_column_wrapper keys{1, 1, 2, 2, 3, 3}; - groupby::groupby gby{table_view{{keys}}}; - - fixed_width_column_wrapper values{{1, 2, 3, 4, 5, 6}, - {true, false, true, false, true, false}}; - - std::vector agg_requests(1); - agg_requests[0].values = values; - agg_requests[0].aggregations.push_back(cudf::make_collect_list_aggregation(null_policy::EXCLUDE)); - - CUDF_EXPECT_THROW_MESSAGE(gby.aggregate(agg_requests), - "null exclusion is not supported on groupby COLLECT_LIST aggregation."); -} - } // namespace test } // namespace cudf From 1225a1b0451b4a5e52d2aa5350c42298030ae233 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Sun, 25 Apr 2021 14:48:31 +0800 Subject: [PATCH 2/6] some refinement Signed-off-by: sperlingxx --- cpp/src/groupby/sort/group_collect.cu | 2 +- cpp/tests/groupby/collect_set_test.cpp | 43 ++++++++++++++++---------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/cpp/src/groupby/sort/group_collect.cu b/cpp/src/groupby/sort/group_collect.cu index fde6d2f63ad..7751079ddf3 100644 --- a/cpp/src/groupby/sort/group_collect.cu +++ b/cpp/src/groupby/sort/group_collect.cu @@ -53,7 +53,7 @@ std::pair, std::unique_ptr> purge_null_entries( std::unique_ptr &null_purged_values = null_purged_entries[0]; // Recalculate offsets after null entries are purged. - auto null_purged_sizes = make_fixed_width_column( + auto null_purged_sizes = make_numeric_column( data_type{type_to_id()}, num_groups, mask_state::UNALLOCATED, stream, mr); thrust::transform( diff --git a/cpp/tests/groupby/collect_set_test.cpp b/cpp/tests/groupby/collect_set_test.cpp index 9ae1def8136..bcddbcacbb8 100644 --- a/cpp/tests/groupby/collect_set_test.cpp +++ b/cpp/tests/groupby/collect_set_test.cpp @@ -31,10 +31,6 @@ namespace test { #define LCL_V cudf::test::lists_column_wrapper #define LCL_S cudf::test::lists_column_wrapper #define VALIDITY std::initializer_list -#define COLLECT_SET cudf::make_collect_set_aggregation() -#define COLLECT_SET_NULL_UNEQUAL \ - cudf::make_collect_set_aggregation(null_policy::INCLUDE, null_equality::UNEQUAL) -#define COLLECT_SET_NULL_EXCLUDE cudf::make_collect_set_aggregation(null_policy::EXCLUDE) struct CollectSetTest : public cudf::test::BaseFixture { }; @@ -60,7 +56,8 @@ TYPED_TEST(CollectSetTypedTest, TrivialInput) COL_V vals{10}; COL_K keys_expected{1}; LCL_V vals_expected{LCL_V{10}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } // Non-repeated keys @@ -69,7 +66,8 @@ TYPED_TEST(CollectSetTypedTest, TrivialInput) COL_V vals{20, 10}; COL_K keys_expected{1, 2}; LCL_V vals_expected{LCL_V{10}, LCL_V{20}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } } @@ -81,7 +79,8 @@ TYPED_TEST(CollectSetTypedTest, TypicalInput) COL_V vals{10, 11, 10, 10, 20, 21, 21, 20, 30, 33, 32, 31}; COL_K keys_expected{1, 2, 3}; LCL_V vals_expected{{10, 11}, {20, 21}, {30, 31, 32, 33}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } // Expect the result keys to be sorted by sort-based groupby @@ -90,7 +89,8 @@ TYPED_TEST(CollectSetTypedTest, TypicalInput) COL_V vals{40, 10, 20, 40, 30, 30, 20, 11}; COL_K keys_expected{1, 2, 3, 4}; LCL_V vals_expected{{10, 11}, {20}, {30}, {40}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } } @@ -104,14 +104,16 @@ TYPED_TEST(CollectSetTypedTest, SlicedColumnsInput) auto const vals = cudf::slice(vals_original, {0, 4})[0]; // { 10, 11, 10, 10 } auto const keys_expected = COL_K{1}; auto const vals_expected = LCL_V{{10, 11}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } { auto const keys = cudf::slice(keys_original, {2, 10})[0]; // { 1, 1, 2, 2, 2, 2, 3, 3 } auto const vals = cudf::slice(vals_original, {2, 10})[0]; // { 10, 10, 20, 21, 21, 20, 30, 33 } auto const keys_expected = COL_K{1, 2, 3}; auto const vals_expected = LCL_V{{10}, {20, 21}, {30, 33}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } } @@ -137,7 +139,8 @@ TEST_F(CollectSetTest, StringInput) LCL_S vals_expected{{"String 1, first", "String 1, second"}, {"String 2, first", "String 2, second"}, {"String 3, first", "String 3, second"}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } TYPED_TEST(CollectSetTypedTest, CollectWithNulls) @@ -157,17 +160,20 @@ TYPED_TEST(CollectSetTypedTest, CollectWithNulls) LCL_V vals_expected{{{10, null}, VALIDITY{true, false}}, {{20, null}, VALIDITY{true, false}}, {{30, 31}, VALIDITY{true, true}}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); // All nulls per key are kept (nulls are put at the end of each list) vals_expected = LCL_V{{{10, null, null}, VALIDITY{true, false, false}}, {{20, null, null, null}, VALIDITY{true, false, false, false}}, {{30, 31}, VALIDITY{true, true}}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_UNEQUAL); + agg = cudf::make_collect_set_aggregation(null_policy::INCLUDE, null_equality::UNEQUAL); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); // All nulls per key are excluded vals_expected = LCL_V{{10}, {20}, {30, 31}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_EXCLUDE); + agg = cudf::make_collect_set_aggregation(null_policy::EXCLUDE); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } // Expect the result keys to be sorted by sort-based groupby @@ -182,18 +188,21 @@ TYPED_TEST(CollectSetTypedTest, CollectWithNulls) {{20, 21}, VALIDITY{true, true}}, {{null}, VALIDITY{false}}, {{40}, VALIDITY{true}}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET); + auto agg = cudf::make_collect_set_aggregation(); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); // All nulls per key are kept (nulls are put at the end of each list) vals_expected = LCL_V{{{10, null}, VALIDITY{true, false}}, {{20, 21}, VALIDITY{true, true}}, {{null, null, null, null}, VALIDITY{false, false, false, false}}, {{40}, VALIDITY{true}}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_UNEQUAL); + agg = cudf::make_collect_set_aggregation(null_policy::INCLUDE, null_equality::UNEQUAL); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); // All nulls per key are excluded vals_expected = LCL_V{{10}, {20, 21}, {}, {40}}; - test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_EXCLUDE); + agg = cudf::make_collect_set_aggregation(null_policy::EXCLUDE); + test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); } } From 630f052b4b5cc0a56bc2d5659b606bde5196597b Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Tue, 27 Apr 2021 16:31:50 +0800 Subject: [PATCH 3/6] some improvements Signed-off-by: sperlingxx --- cpp/src/groupby/sort/group_collect.cu | 10 +++-- cpp/tests/groupby/collect_set_test.cpp | 54 +++++++++++++------------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/cpp/src/groupby/sort/group_collect.cu b/cpp/src/groupby/sort/group_collect.cu index 7751079ddf3..65653a2c61d 100644 --- a/cpp/src/groupby/sort/group_collect.cu +++ b/cpp/src/groupby/sort/group_collect.cu @@ -85,11 +85,13 @@ std::unique_ptr group_collect(column_view const &values, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - rmm::device_buffer offsets_data( - group_offsets.data(), group_offsets.size() * sizeof(cudf::size_type), stream, mr); + auto offsets_column = make_numeric_column( + data_type(type_to_id()), num_groups + 1, mask_state::UNALLOCATED, stream, mr); - auto offsets_column = std::make_unique( - cudf::data_type(cudf::type_to_id()), num_groups + 1, std::move(offsets_data)); + thrust::copy(rmm::exec_policy(stream), + group_offsets.begin(), + group_offsets.end(), + offsets_column->mutable_view().template begin()); std::unique_ptr child_column; diff --git a/cpp/tests/groupby/collect_set_test.cpp b/cpp/tests/groupby/collect_set_test.cpp index bcddbcacbb8..ce3a9a49372 100644 --- a/cpp/tests/groupby/collect_set_test.cpp +++ b/cpp/tests/groupby/collect_set_test.cpp @@ -33,6 +33,17 @@ namespace test { #define VALIDITY std::initializer_list struct CollectSetTest : public cudf::test::BaseFixture { + static auto collect_set() { return cudf::make_collect_set_aggregation(); } + + static auto collect_set_null_unequal() + { + return cudf::make_collect_set_aggregation(null_policy::INCLUDE, null_equality::UNEQUAL); + } + + static auto collect_set_null_exclude() + { + return cudf::make_collect_set_aggregation(null_policy::EXCLUDE); + } }; template @@ -56,8 +67,7 @@ TYPED_TEST(CollectSetTypedTest, TrivialInput) COL_V vals{10}; COL_K keys_expected{1}; LCL_V vals_expected{LCL_V{10}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); } // Non-repeated keys @@ -66,8 +76,7 @@ TYPED_TEST(CollectSetTypedTest, TrivialInput) COL_V vals{20, 10}; COL_K keys_expected{1, 2}; LCL_V vals_expected{LCL_V{10}, LCL_V{20}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); } } @@ -79,8 +88,7 @@ TYPED_TEST(CollectSetTypedTest, TypicalInput) COL_V vals{10, 11, 10, 10, 20, 21, 21, 20, 30, 33, 32, 31}; COL_K keys_expected{1, 2, 3}; LCL_V vals_expected{{10, 11}, {20, 21}, {30, 31, 32, 33}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); } // Expect the result keys to be sorted by sort-based groupby @@ -89,8 +97,7 @@ TYPED_TEST(CollectSetTypedTest, TypicalInput) COL_V vals{40, 10, 20, 40, 30, 30, 20, 11}; COL_K keys_expected{1, 2, 3, 4}; LCL_V vals_expected{{10, 11}, {20}, {30}, {40}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); } } @@ -104,16 +111,14 @@ TYPED_TEST(CollectSetTypedTest, SlicedColumnsInput) auto const vals = cudf::slice(vals_original, {0, 4})[0]; // { 10, 11, 10, 10 } auto const keys_expected = COL_K{1}; auto const vals_expected = LCL_V{{10, 11}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); } { auto const keys = cudf::slice(keys_original, {2, 10})[0]; // { 1, 1, 2, 2, 2, 2, 3, 3 } auto const vals = cudf::slice(vals_original, {2, 10})[0]; // { 10, 10, 20, 21, 21, 20, 30, 33 } auto const keys_expected = COL_K{1, 2, 3}; auto const vals_expected = LCL_V{{10}, {20, 21}, {30, 33}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); } } @@ -139,8 +144,7 @@ TEST_F(CollectSetTest, StringInput) LCL_S vals_expected{{"String 1, first", "String 1, second"}, {"String 2, first", "String 2, second"}, {"String 3, first", "String 3, second"}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); } TYPED_TEST(CollectSetTypedTest, CollectWithNulls) @@ -160,20 +164,19 @@ TYPED_TEST(CollectSetTypedTest, CollectWithNulls) LCL_V vals_expected{{{10, null}, VALIDITY{true, false}}, {{20, null}, VALIDITY{true, false}}, {{30, 31}, VALIDITY{true, true}}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); // All nulls per key are kept (nulls are put at the end of each list) vals_expected = LCL_V{{{10, null, null}, VALIDITY{true, false, false}}, {{20, null, null, null}, VALIDITY{true, false, false, false}}, {{30, 31}, VALIDITY{true, true}}}; - agg = cudf::make_collect_set_aggregation(null_policy::INCLUDE, null_equality::UNEQUAL); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg( + keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set_null_unequal()); // All nulls per key are excluded vals_expected = LCL_V{{10}, {20}, {30, 31}}; - agg = cudf::make_collect_set_aggregation(null_policy::EXCLUDE); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg( + keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set_null_exclude()); } // Expect the result keys to be sorted by sort-based groupby @@ -188,21 +191,20 @@ TYPED_TEST(CollectSetTypedTest, CollectWithNulls) {{20, 21}, VALIDITY{true, true}}, {{null}, VALIDITY{false}}, {{40}, VALIDITY{true}}}; - auto agg = cudf::make_collect_set_aggregation(); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg(keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set()); // All nulls per key are kept (nulls are put at the end of each list) vals_expected = LCL_V{{{10, null}, VALIDITY{true, false}}, {{20, 21}, VALIDITY{true, true}}, {{null, null, null, null}, VALIDITY{false, false, false, false}}, {{40}, VALIDITY{true}}}; - agg = cudf::make_collect_set_aggregation(null_policy::INCLUDE, null_equality::UNEQUAL); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg( + keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set_null_unequal()); // All nulls per key are excluded vals_expected = LCL_V{{10}, {20, 21}, {}, {40}}; - agg = cudf::make_collect_set_aggregation(null_policy::EXCLUDE); - test_single_agg(keys, vals, keys_expected, vals_expected, std::move(agg)); + test_single_agg( + keys, vals, keys_expected, vals_expected, CollectSetTest::collect_set_null_exclude()); } } From 6bd42e18cab7d0a9d3118e2b1700f9a475c923d8 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 28 Apr 2021 11:36:21 +0800 Subject: [PATCH 4/6] refine Signed-off-by: sperlingxx --- cpp/src/groupby/sort/group_collect.cu | 38 ++++++++++++++------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/cpp/src/groupby/sort/group_collect.cu b/cpp/src/groupby/sort/group_collect.cu index 65653a2c61d..a2f0d6433c4 100644 --- a/cpp/src/groupby/sort/group_collect.cu +++ b/cpp/src/groupby/sort/group_collect.cu @@ -85,24 +85,26 @@ std::unique_ptr group_collect(column_view const &values, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - auto offsets_column = make_numeric_column( - data_type(type_to_id()), num_groups + 1, mask_state::UNALLOCATED, stream, mr); - - thrust::copy(rmm::exec_policy(stream), - group_offsets.begin(), - group_offsets.end(), - offsets_column->mutable_view().template begin()); - - std::unique_ptr child_column; - - // If column of grouped values contains null elements, and null_policy == EXCLUDE, - // those elements must be filtered out, and offsets recomputed. - if (null_handling == null_policy::EXCLUDE && values.has_nulls()) { - std::tie(child_column, offsets_column) = cudf::groupby::detail::purge_null_entries( - values, offsets_column->view(), num_groups, stream, mr); - } else { - child_column = std::make_unique(values, stream, mr); - } + auto [child_column, + offsets_column] = [null_handling, num_groups, &values, &group_offsets, stream, mr] { + auto offsets_column = make_numeric_column( + data_type(type_to_id()), num_groups + 1, mask_state::UNALLOCATED, stream, mr); + + thrust::copy(rmm::exec_policy(stream), + group_offsets.begin(), + group_offsets.end(), + offsets_column->mutable_view().template begin()); + + // If column of grouped values contains null elements, and null_policy == EXCLUDE, + // those elements must be filtered out, and offsets recomputed. + if (null_handling == null_policy::EXCLUDE && values.has_nulls()) { + return cudf::groupby::detail::purge_null_entries( + values, offsets_column->view(), num_groups, stream, mr); + } else { + return std::make_pair(std::make_unique(values, stream, mr), + std::move(offsets_column)); + } + }(); return make_lists_column(num_groups, std::move(offsets_column), From 44fbfb84056ffb8fbb2c9c3abffa56a7e5d8f92c Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 12 May 2021 15:37:10 +0800 Subject: [PATCH 5/6] some refinement Signed-off-by: sperlingxx --- cpp/src/groupby/sort/group_collect.cu | 23 +++++++++++++---------- cpp/src/groupby/sort/group_reductions.hpp | 2 +- cpp/tests/groupby/collect_list_tests.cpp | 11 ++++------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/cpp/src/groupby/sort/group_collect.cu b/cpp/src/groupby/sort/group_collect.cu index a2f0d6433c4..5372fc333ab 100644 --- a/cpp/src/groupby/sort/group_collect.cu +++ b/cpp/src/groupby/sort/group_collect.cu @@ -32,6 +32,13 @@ namespace groupby { namespace detail { /** * @brief Purge null entries in grouped values, and adjust group offsets. + * + * @param values Grouped values to be purged + * @param offsets Offsets of groups' starting points + * @param num_groups Number of groups + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory + * @return Pair of null eliminated grouped values corresponding offsets */ std::pair, std::unique_ptr> purge_null_entries( column_view const &values, @@ -50,17 +57,16 @@ std::pair, std::unique_ptr> purge_null_entries( auto null_purged_entries = cudf::detail::copy_if(table_view{{values}}, not_null_pred, stream, mr)->release(); - std::unique_ptr &null_purged_values = null_purged_entries[0]; + auto null_purged_values = std::move(null_purged_entries.front()); // Recalculate offsets after null entries are purged. - auto null_purged_sizes = make_numeric_column( - data_type{type_to_id()}, num_groups, mask_state::UNALLOCATED, stream, mr); + rmm::device_uvector null_purged_sizes(num_groups, stream); thrust::transform( rmm::exec_policy(stream), thrust::make_counting_iterator(0), thrust::make_counting_iterator(num_groups), - null_purged_sizes->mutable_view().template begin(), + null_purged_sizes.begin(), [d_offsets = offsets.template begin(), not_null_pred] __device__(auto i) { return thrust::count_if(thrust::seq, thrust::make_counting_iterator(d_offsets[i]), @@ -69,10 +75,7 @@ std::pair, std::unique_ptr> purge_null_entries( }); auto null_purged_offsets = strings::detail::make_offsets_child_column( - null_purged_sizes->view().template begin(), - null_purged_sizes->view().template end(), - stream, - mr); + null_purged_sizes.cbegin(), null_purged_sizes.cend(), stream, mr); return std::make_pair, std::unique_ptr>( std::move(null_purged_values), std::move(null_purged_offsets)); @@ -88,12 +91,12 @@ std::unique_ptr group_collect(column_view const &values, auto [child_column, offsets_column] = [null_handling, num_groups, &values, &group_offsets, stream, mr] { auto offsets_column = make_numeric_column( - data_type(type_to_id()), num_groups + 1, mask_state::UNALLOCATED, stream, mr); + data_type(type_to_id()), num_groups + 1, mask_state::UNALLOCATED, stream, mr); thrust::copy(rmm::exec_policy(stream), group_offsets.begin(), group_offsets.end(), - offsets_column->mutable_view().template begin()); + offsets_column->mutable_view().template begin()); // If column of grouped values contains null elements, and null_policy == EXCLUDE, // those elements must be filtered out, and offsets recomputed. diff --git a/cpp/src/groupby/sort/group_reductions.hpp b/cpp/src/groupby/sort/group_reductions.hpp index 9e3269f4c0c..7cc0aea8362 100644 --- a/cpp/src/groupby/sort/group_reductions.hpp +++ b/cpp/src/groupby/sort/group_reductions.hpp @@ -359,8 +359,8 @@ std::unique_ptr group_nth_element(column_view const& values, * @param num_groups Number of groups * @param null_handling Exclude nulls while counting if null_policy::EXCLUDE, * Include nulls if null_policy::INCLUDE. - * @param mr Device memory resource used to allocate the returned column's device memory * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory */ std::unique_ptr group_collect(column_view const& values, cudf::device_span group_offsets, diff --git a/cpp/tests/groupby/collect_list_tests.cpp b/cpp/tests/groupby/collect_list_tests.cpp index c38ae631b0a..7580c1c4e3b 100644 --- a/cpp/tests/groupby/collect_list_tests.cpp +++ b/cpp/tests/groupby/collect_list_tests.cpp @@ -110,18 +110,15 @@ TYPED_TEST(groupby_collect_list_test, CollectListsWithNullExclusion) using K = int32_t; using V = TypeParam; - using LCW = cudf::test::lists_column_wrapper; + using LCW = cudf::test::lists_column_wrapper; fixed_width_column_wrapper keys{1, 1, 2, 2, 3, 3, 4, 4}; - const bool validity_mask[8] = {true, false, false, true, true, true, false, false}; - auto validity = cudf::detail::make_counting_transform_iterator( - 0, [&validity_mask](auto i) { return validity_mask[i]; }); - lists_column_wrapper values{ - {{1, 2}, {3, 4}, {5, 6, 7}, LCW{}, {9, 10}, {11}, {20, 30, 40}, LCW{}}, validity}; + const bool validity_mask[] = {true, false, false, true, true, true, false, false}; + LCW values{{{1, 2}, {3, 4}, {5, 6, 7}, LCW{}, {9, 10}, {11}, {20, 30, 40}, LCW{}}, validity_mask}; fixed_width_column_wrapper expect_keys{1, 2, 3, 4}; - lists_column_wrapper expect_vals{{{1, 2}}, {LCW{}}, {{9, 10}, {11}}, {}}; + LCW expect_vals{{{1, 2}}, {LCW{}}, {{9, 10}, {11}}, {}}; auto agg = cudf::make_collect_list_aggregation(null_policy::EXCLUDE); test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg)); From ba83f2d78a50984889dc2af78c7f3d59deefa40e Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Wed, 12 May 2021 20:39:29 +0800 Subject: [PATCH 6/6] Update cpp/src/groupby/sort/group_collect.cu Co-authored-by: David Wendt <45795991+davidwendt@users.noreply.github.com> --- cpp/src/groupby/sort/group_collect.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/groupby/sort/group_collect.cu b/cpp/src/groupby/sort/group_collect.cu index 5372fc333ab..1e6a681af94 100644 --- a/cpp/src/groupby/sort/group_collect.cu +++ b/cpp/src/groupby/sort/group_collect.cu @@ -38,7 +38,7 @@ namespace detail { * @param num_groups Number of groups * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory - * @return Pair of null eliminated grouped values corresponding offsets + * @return Pair of null-eliminated grouped values and corresponding offsets */ std::pair, std::unique_ptr> purge_null_entries( column_view const &values,