Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support exclude null_policy for collect list/set in groupby #8044

Merged
merged 9 commits into from
May 13, 2021
20 changes: 12 additions & 8 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,15 @@ void aggregrate_result_functor::operator()<aggregation::COLLECT_LIST>(aggregatio
{
auto null_handling =
static_cast<cudf::detail::collect_list_aggregation const&>(agg)._null_handling;
CUDF_EXPECTS(null_handling == null_policy::INCLUDE,
"null exclusion is not supported on groupby COLLECT_LIST aggregation.");

if (cache.has_result(col_idx, agg)) return;

auto result = detail::group_collect(
get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr);
auto result = detail::group_collect(get_grouped_values(),
helper.group_offsets(stream),
helper.num_groups(stream),
null_handling,
stream,
mr);

cache.add_result(col_idx, agg, std::move(result));
};
Expand All @@ -383,13 +385,15 @@ void aggregrate_result_functor::operator()<aggregation::COLLECT_SET>(aggregation
{
auto const null_handling =
static_cast<cudf::detail::collect_set_aggregation const&>(agg)._null_handling;
CUDF_EXPECTS(null_handling == null_policy::INCLUDE,
"null exclusion is not supported on groupby COLLECT_SET aggregation.");

if (cache.has_result(col_idx, agg)) { return; }

auto const collect_result = detail::group_collect(
get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr);
auto const collect_result = detail::group_collect(get_grouped_values(),
helper.group_offsets(stream),
helper.num_groups(stream),
null_handling,
stream,
mr);
auto const nulls_equal =
static_cast<cudf::detail::collect_set_aggregation const&>(agg)._nulls_equal;
auto const nans_equal =
Expand Down
72 changes: 68 additions & 4 deletions cpp/src/groupby/sort/group_collect.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,30 +17,94 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/copy_if.cuh>
#include <cudf/detail/gather.cuh>
#include <cudf/strings/detail/utilities.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <memory>

namespace cudf {
namespace groupby {
namespace detail {
/**
* @brief Purge null entries in grouped values, and adjust group offsets.
*/
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> purge_null_entries(
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
column_view const &values,
column_view const &offsets,
size_type num_groups,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr)
{
auto values_device_view = column_device_view::create(values, stream);

auto not_null_pred = [d_value = *values_device_view] __device__(auto i) {
return d_value.is_valid_nocheck(i);
};

// Purge null entries in grouped values.
auto null_purged_entries =
cudf::detail::copy_if(table_view{{values}}, not_null_pred, stream, mr)->release();

std::unique_ptr<column> &null_purged_values = null_purged_entries[0];
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved

// Recalculate offsets after null entries are purged.
auto null_purged_sizes = make_fixed_width_column(
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
data_type{type_to_id<size_type>()}, num_groups, mask_state::UNALLOCATED, stream, mr);

thrust::transform(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(num_groups),
null_purged_sizes->mutable_view().template begin<size_type>(),
[d_offsets = offsets.template begin<size_type>(), not_null_pred] __device__(auto i) {
return thrust::count_if(thrust::seq,
thrust::make_counting_iterator<size_type>(d_offsets[i]),
thrust::make_counting_iterator<size_type>(d_offsets[i + 1]),
not_null_pred);
});

auto null_purged_offsets = strings::detail::make_offsets_child_column(
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
null_purged_sizes->view().template begin<size_type>(),
null_purged_sizes->view().template end<size_type>(),
stream,
mr);

return std::make_pair<std::unique_ptr<column>, std::unique_ptr<column>>(
std::move(null_purged_values), std::move(null_purged_offsets));
}

std::unique_ptr<column> group_collect(column_view const &values,
cudf::device_span<size_type const> group_offsets,
size_type num_groups,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr)
{
rmm::device_buffer offsets_data(
group_offsets.data(), group_offsets.size() * sizeof(cudf::size_type), stream, mr);

auto offsets = std::make_unique<cudf::column>(
auto offsets_column = std::make_unique<cudf::column>(
cudf::data_type(cudf::type_to_id<cudf::size_type>()), num_groups + 1, std::move(offsets_data));
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

std::unique_ptr<cudf::column> child_column;

// If column of grouped values contains null elements, and null_policy == EXCLUDE,
// those elements must be filtered out, and offsets recomputed.
if (null_handling == null_policy::EXCLUDE && values.has_nulls()) {
std::tie(child_column, offsets_column) = cudf::groupby::detail::purge_null_entries(
values, offsets_column->view(), num_groups, stream, mr);
} else {
child_column = std::make_unique<cudf::column>(values, stream, mr);
}

return make_lists_column(num_groups,
std::move(offsets),
std::make_unique<cudf::column>(values, stream, mr),
std::move(offsets_column),
std::move(child_column),
0,
rmm::device_buffer{0, stream, mr},
stream,
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/groupby/sort/group_reductions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,15 @@ std::unique_ptr<column> group_nth_element(column_view const& values,
* @param values Grouped values to collect
* @param group_offsets Offsets of groups' starting points within @p values
* @param num_groups Number of groups
* @param null_handling Exclude nulls while counting if null_policy::EXCLUDE,
* Include nulls if null_policy::INCLUDE.
* @param mr Device memory resource used to allocate the returned column's device memory
* @param stream CUDA stream used for device memory operations and kernel launches.
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
*/
std::unique_ptr<column> group_collect(column_view const& values,
cudf::device_span<size_type const> group_offsets,
size_type num_groups,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

Expand Down
20 changes: 9 additions & 11 deletions cpp/tests/groupby/collect_set_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace test {
#define COLLECT_SET cudf::make_collect_set_aggregation()
#define COLLECT_SET_NULL_UNEQUAL \
cudf::make_collect_set_aggregation(null_policy::INCLUDE, null_equality::UNEQUAL)
#define COLLECT_SET_NULL_EXCLUDE cudf::make_collect_set_aggregation(null_policy::EXCLUDE)
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

struct CollectSetTest : public cudf::test::BaseFixture {
};
Expand All @@ -47,17 +48,6 @@ using FixedWidthTypesNotBool = cudf::test::Concat<cudf::test::IntegralTypesNotBo
cudf::test::TimestampTypes>;
TYPED_TEST_CASE(CollectSetTypedTest, FixedWidthTypesNotBool);

TYPED_TEST(CollectSetTypedTest, ExceptionTests)
{
std::vector<groupby::aggregation_request> agg_requests(1);
agg_requests[0].values = COL_V{{1, 2, 3, 4, 5, 6}, {true, false, true, false, true, false}};
agg_requests[0].aggregations.push_back(cudf::make_collect_list_aggregation(null_policy::EXCLUDE));

// groupby cannot exclude nulls
groupby::groupby gby{table_view{{COL_K{1, 1, 2, 2, 3, 3}}}};
EXPECT_THROW(gby.aggregate(agg_requests), cudf::logic_error);
}

TYPED_TEST(CollectSetTypedTest, TrivialInput)
{
// Empty input
Expand Down Expand Up @@ -174,6 +164,10 @@ TYPED_TEST(CollectSetTypedTest, CollectWithNulls)
{{20, null, null, null}, VALIDITY{true, false, false, false}},
{{30, 31}, VALIDITY{true, true}}};
test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_UNEQUAL);

// All nulls per key are excluded
vals_expected = LCL_V{{10}, {20}, {30, 31}};
test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_EXCLUDE);
}

// Expect the result keys to be sorted by sort-based groupby
Expand All @@ -196,6 +190,10 @@ TYPED_TEST(CollectSetTypedTest, CollectWithNulls)
{{null, null, null, null}, VALIDITY{false, false, false, false}},
{{40}, VALIDITY{true}}};
test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_UNEQUAL);

// All nulls per key are excluded
vals_expected = LCL_V{{10}, {20, 21}, {}, {40}};
test_single_agg(keys, vals, keys_expected, vals_expected, COLLECT_SET_NULL_EXCLUDE);
}
}

Expand Down
59 changes: 40 additions & 19 deletions cpp/tests/groupby/group_collect_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ TYPED_TEST(groupby_collect_list_test, CollectWithNulls)
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_collect_list_test, CollectWithNullExclusion)
{
using K = int32_t;
using V = TypeParam;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 1, 2, 2, 3, 3, 4, 4};

fixed_width_column_wrapper<V, int32_t> values{
{1, 2, 3, 4, 5, 6, 7, 8, 9}, {false, true, false, true, false, false, false, true, true}};

fixed_width_column_wrapper<K, int32_t> expect_keys{1, 2, 3, 4};

lists_column_wrapper<V, int32_t> expect_vals{{2}, {4}, {}, {8, 9}};

auto agg = cudf::make_collect_list_aggregation(null_policy::EXCLUDE);
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_collect_list_test, CollectLists)
{
using K = int32_t;
Expand All @@ -87,6 +105,28 @@ TYPED_TEST(groupby_collect_list_test, CollectLists)
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_collect_list_test, CollectListsWithNullExclusion)
{
using K = int32_t;
using V = TypeParam;

using LCW = cudf::test::lists_column_wrapper<TypeParam, int32_t>;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 2, 2, 3, 3, 4, 4};
const bool validity_mask[8] = {true, false, false, true, true, true, false, false};
auto validity = cudf::detail::make_counting_transform_iterator(
0, [&validity_mask](auto i) { return validity_mask[i]; });
lists_column_wrapper<V, int32_t> values{
{{1, 2}, {3, 4}, {5, 6, 7}, LCW{}, {9, 10}, {11}, {20, 30, 40}, LCW{}}, validity};

fixed_width_column_wrapper<K, int32_t> expect_keys{1, 2, 3, 4};

lists_column_wrapper<V, int32_t> expect_vals{{{1, 2}}, {LCW{}}, {{9, 10}, {11}}, {}};

auto agg = cudf::make_collect_list_aggregation(null_policy::EXCLUDE);
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_collect_list_test, dictionary)
{
using K = int32_t;
Expand All @@ -109,24 +149,5 @@ TYPED_TEST(groupby_collect_list_test, dictionary)
keys, vals, expect_keys, expect_vals->view(), cudf::make_collect_list_aggregation());
}

TYPED_TEST(groupby_collect_list_test, CollectFailsWithNullExclusion)
{
using K = int32_t;
using V = TypeParam;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 2, 2, 3, 3};
groupby::groupby gby{table_view{{keys}}};

fixed_width_column_wrapper<V, int32_t> values{{1, 2, 3, 4, 5, 6},
{true, false, true, false, true, false}};

std::vector<groupby::aggregation_request> agg_requests(1);
agg_requests[0].values = values;
agg_requests[0].aggregations.push_back(cudf::make_collect_list_aggregation(null_policy::EXCLUDE));

CUDF_EXPECT_THROW_MESSAGE(gby.aggregate(agg_requests),
"null exclusion is not supported on groupby COLLECT_LIST aggregation.");
}

} // namespace test
} // namespace cudf