diff --git a/cpp/include/cudf/aggregation.hpp b/cpp/include/cudf/aggregation.hpp index 6661f518639..374af536dc5 100644 --- a/cpp/include/cudf/aggregation.hpp +++ b/cpp/include/cudf/aggregation.hpp @@ -503,9 +503,12 @@ std::unique_ptr make_merge_m2_aggregation(); * * Compute covariance between two columns. * The input columns are child columns of a non-nullable struct columns. + * @param min_periods Minimum number of non-null observations required to produce a result. + * @param ddof Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N is + * the number of non-null observations. */ template -std::unique_ptr make_covariance_aggregation(); +std::unique_ptr make_covariance_aggregation(size_type min_periods = 1, size_type ddof = 1); /** * @brief Factory to create a CORRELATION aggregation @@ -513,10 +516,12 @@ std::unique_ptr make_covariance_aggregation(); * Compute correlation coefficient between two columns. * The input columns are child columns of a non-nullable struct columns. * - * @param[in] type: correlation_type + * @param type correlation_type + * @param min_periods Minimum number of non-null observations required to produce a result. */ template -std::unique_ptr make_correlation_aggregation(correlation_type type); +std::unique_ptr make_correlation_aggregation(correlation_type type, + size_type min_periods = 1); /** * @brief Factory to create a TDIGEST aggregation diff --git a/cpp/include/cudf/detail/aggregation/aggregation.hpp b/cpp/include/cudf/detail/aggregation/aggregation.hpp index e12ed3f521e..69bde7f57fd 100644 --- a/cpp/include/cudf/detail/aggregation/aggregation.hpp +++ b/cpp/include/cudf/detail/aggregation/aggregation.hpp @@ -901,7 +901,14 @@ class merge_m2_aggregation final : public groupby_aggregation { */ class covariance_aggregation final : public groupby_aggregation { public: - explicit covariance_aggregation() : aggregation{COVARIANCE} {} + explicit covariance_aggregation(size_type min_periods, size_type ddof) + : aggregation{COVARIANCE}, _min_periods{min_periods}, _ddof(ddof) + { + } + size_type _min_periods; + size_type _ddof; + + size_t do_hash() const override { return this->aggregation::do_hash() ^ hash_impl(); } std::unique_ptr clone() const override { @@ -913,6 +920,12 @@ class covariance_aggregation final : public groupby_aggregation { return collector.visit(col_type, *this); } void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } + + protected: + size_t hash_impl() const + { + return std::hash{}(_min_periods) ^ std::hash{}(_ddof); + } }; /** @@ -920,8 +933,12 @@ class covariance_aggregation final : public groupby_aggregation { */ class correlation_aggregation final : public groupby_aggregation { public: - explicit correlation_aggregation(correlation_type type) : aggregation{CORRELATION}, _type{type} {} + explicit correlation_aggregation(correlation_type type, size_type min_periods) + : aggregation{CORRELATION}, _type{type}, _min_periods{min_periods} + { + } correlation_type _type; + size_type _min_periods; bool is_equal(aggregation const& _other) const override { @@ -944,7 +961,10 @@ class correlation_aggregation final : public groupby_aggregation { void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); } protected: - size_t hash_impl() const { return std::hash{}(static_cast(_type)); } + size_t hash_impl() const + { + return std::hash{}(static_cast(_type)) ^ std::hash{}(_min_periods); + } }; /** diff --git a/cpp/src/aggregation/aggregation.cpp b/cpp/src/aggregation/aggregation.cpp index 3c6ab157d46..31bf9d65d56 100644 --- a/cpp/src/aggregation/aggregation.cpp +++ b/cpp/src/aggregation/aggregation.cpp @@ -713,23 +713,25 @@ template std::unique_ptr make_merge_m2_aggregation -std::unique_ptr make_covariance_aggregation() +std::unique_ptr make_covariance_aggregation(size_type min_periods, size_type ddof) { - return std::make_unique(); + return std::make_unique(min_periods, ddof); } -template std::unique_ptr make_covariance_aggregation(); -template std::unique_ptr make_covariance_aggregation(); +template std::unique_ptr make_covariance_aggregation( + size_type min_periods, size_type ddof); +template std::unique_ptr make_covariance_aggregation( + size_type min_periods, size_type ddof); /// Factory to create a CORRELATION aggregation template -std::unique_ptr make_correlation_aggregation(correlation_type type) +std::unique_ptr make_correlation_aggregation(correlation_type type, size_type min_periods) { - return std::make_unique(type); + return std::make_unique(type, min_periods); } template std::unique_ptr make_correlation_aggregation( - correlation_type type); + correlation_type type, size_type min_periods); template std::unique_ptr make_correlation_aggregation( - correlation_type type); + correlation_type type, size_type min_periods); template std::unique_ptr make_tdigest_aggregation(int max_centroids) diff --git a/cpp/src/groupby/sort/aggregate.cpp b/cpp/src/groupby/sort/aggregate.cpp index e471fccda07..83c6c1bca57 100644 --- a/cpp/src/groupby/sort/aggregate.cpp +++ b/cpp/src/groupby/sort/aggregate.cpp @@ -573,6 +573,7 @@ void aggregate_result_functor::operator()(aggregation c CUDF_EXPECTS(values.num_children() == 2, "Input to `groupby covariance` must be a structs column having 2 children columns."); + auto const& cov_agg = dynamic_cast(agg); // Covariance only for valid values in both columns. // in non-identical null mask cases, this prevents caching of the results - STD, MEAN, COUNT. auto [_, values_child0, values_child1] = @@ -596,6 +597,8 @@ void aggregate_result_functor::operator()(aggregation c count, mean0, mean1, + cov_agg._min_periods, + cov_agg._ddof, stream, mr)); }; @@ -629,28 +632,33 @@ void aggregate_result_functor::operator()(aggregation aggregate_result_functor(values_child0, helper, cache, stream, mr).operator()(*std_agg); aggregate_result_functor(values_child1, helper, cache, stream, mr).operator()(*std_agg); - auto const stddev0 = cache.get_result(values_child0, *std_agg); - auto const stddev1 = cache.get_result(values_child1, *std_agg); - - auto mean_agg = make_mean_aggregation(); - auto const mean0 = cache.get_result(values_child0, *mean_agg); - auto const mean1 = cache.get_result(values_child1, *mean_agg); - auto count_agg = make_count_aggregation(); - auto const count = cache.get_result(values_child0, *count_agg); - // Compute covariance here to avoid repeated computation of mean & count - auto cov_agg = make_covariance_aggregation(); - cache.add_result(values, - *cov_agg, - detail::group_covariance(get_grouped_values().child(0), - get_grouped_values().child(1), - helper.group_labels(stream), - helper.num_groups(stream), - count, - mean0, - mean1, - stream, - mr)); + auto cov_agg = make_covariance_aggregation(corr_agg._min_periods); + if (not cache.has_result(values, *cov_agg)) { + auto mean_agg = make_mean_aggregation(); + auto const mean0 = cache.get_result(values_child0, *mean_agg); + auto const mean1 = cache.get_result(values_child1, *mean_agg); + auto count_agg = make_count_aggregation(); + auto const count = cache.get_result(values_child0, *count_agg); + + auto const& cov_agg_obj = dynamic_cast(*cov_agg); + cache.add_result(values, + *cov_agg, + detail::group_covariance(get_grouped_values().child(0), + get_grouped_values().child(1), + helper.group_labels(stream), + helper.num_groups(stream), + count, + mean0, + mean1, + cov_agg_obj._min_periods, + cov_agg_obj._ddof, + stream, + mr)); + } + + auto const stddev0 = cache.get_result(values_child0, *std_agg); + auto const stddev1 = cache.get_result(values_child1, *std_agg); auto const covariance = cache.get_result(values, *cov_agg); cache.add_result( values, agg, detail::group_correlation(covariance, stddev0, stddev1, stream, mr)); diff --git a/cpp/src/groupby/sort/group_correlation.cu b/cpp/src/groupby/sort/group_correlation.cu index e43d0185e93..cdcf4311be7 100644 --- a/cpp/src/groupby/sort/group_correlation.cu +++ b/cpp/src/groupby/sort/group_correlation.cu @@ -113,6 +113,8 @@ std::unique_ptr group_covariance(column_view const& values_0, column_view const& count, column_view const& mean_0, column_view const& mean_1, + size_type min_periods, + size_type ddof, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { @@ -140,8 +142,13 @@ std::unique_ptr group_covariance(column_view const& values_0, auto d_values_0 = column_device_view::create(values_0, stream); auto d_values_1 = column_device_view::create(values_1, stream); - covariance_transform covariance_transform_op{ - *d_values_0, *d_values_1, mean0_ptr, mean1_ptr, count.data(), group_labels.begin()}; + covariance_transform covariance_transform_op{*d_values_0, + *d_values_1, + mean0_ptr, + mean1_ptr, + count.data(), + group_labels.begin(), + ddof}; auto result = make_numeric_column( data_type(type_to_id()), num_groups, mask_state::UNALLOCATED, stream, mr); @@ -157,8 +164,8 @@ std::unique_ptr group_covariance(column_view const& values_0, thrust::make_discard_iterator(), d_result); - auto is_null = [ddof = covariance_transform_op.ddof] __device__(size_type group_size) { - return not(group_size == 0 or group_size - ddof <= 0); + auto is_null = [ddof, min_periods] __device__(size_type group_size) { + return not(group_size == 0 or group_size - ddof <= 0 or group_size < min_periods); }; auto [new_nullmask, null_count] = cudf::detail::valid_if(count.begin(), count.end(), is_null, stream, mr); diff --git a/cpp/src/groupby/sort/group_reductions.hpp b/cpp/src/groupby/sort/group_reductions.hpp index 789a289a07e..75708c7b01c 100644 --- a/cpp/src/groupby/sort/group_reductions.hpp +++ b/cpp/src/groupby/sort/group_reductions.hpp @@ -451,6 +451,8 @@ std::unique_ptr group_merge_m2(column_view const& values, * @param count The count of valid rows of the grouped values of both columns * @param mean_0 The mean of the first grouped values column * @param mean_1 The mean of the second grouped values column + * @param min_periods The minimum number of non-null rows required to consider the covariance + * @param ddof The delta degrees of freedom used in the calculation of the variance * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory */ @@ -461,6 +463,8 @@ std::unique_ptr group_covariance(column_view const& values_0, column_view const& count, column_view const& mean_0, column_view const& mean_1, + size_type min_periods, + size_type ddof, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); diff --git a/cpp/tests/groupby/correlation_tests.cpp b/cpp/tests/groupby/correlation_tests.cpp index 90d230ef1eb..4aa4ef236f0 100644 --- a/cpp/tests/groupby/correlation_tests.cpp +++ b/cpp/tests/groupby/correlation_tests.cpp @@ -32,7 +32,8 @@ using namespace cudf::test::iterators; namespace cudf { namespace test { -using structs = structs_column_wrapper; +constexpr auto nan = std::numeric_limits::quiet_NaN(); +using structs = structs_column_wrapper; template struct groupby_correlation_test : public cudf::test::BaseFixture { @@ -54,8 +55,7 @@ TYPED_TEST(groupby_correlation_test, basic) auto vals = structs{{member_0, member_1}}; fixed_width_column_wrapper expect_keys{1, 2, 3}; - fixed_width_column_wrapper expect_vals{ - {1.0, 0.6, std::numeric_limits::quiet_NaN()}}; + fixed_width_column_wrapper expect_vals{{1.0, 0.6, nan}}; auto agg = cudf::make_correlation_aggregation(cudf::correlation_type::PEARSON); @@ -129,8 +129,7 @@ TYPED_TEST(groupby_correlation_test, null_keys_and_values) auto vals = structs{{val0, val1}}; fixed_width_column_wrapper expect_keys({1, 2, 3, 4}, no_nulls()); - fixed_width_column_wrapper expect_vals( - {1.0, 0.6, std::numeric_limits::quiet_NaN(), 0.}, {1, 1, 1, 0}); + fixed_width_column_wrapper expect_vals({1.0, 0.6, nan, 0.}, {1, 1, 1, 0}); auto agg = cudf::make_correlation_aggregation(cudf::correlation_type::PEARSON); @@ -153,8 +152,7 @@ TYPED_TEST(groupby_correlation_test, null_values_same) auto vals = structs{{val0, val1}}; fixed_width_column_wrapper expect_keys({1, 2, 3, 4}, no_nulls()); - fixed_width_column_wrapper expect_vals( - {1.0, 0.6, std::numeric_limits::quiet_NaN(), 0.}, {1, 1, 1, 0}); + fixed_width_column_wrapper expect_vals({1.0, 0.6, nan, 0.}, {1, 1, 1, 0}); auto agg = cudf::make_correlation_aggregation(cudf::correlation_type::PEARSON); @@ -181,14 +179,41 @@ TYPED_TEST(groupby_correlation_test, null_values_different) auto vals = structs{{val0, val1}}; fixed_width_column_wrapper expect_keys({1, 2, 3, 4}, no_nulls()); - fixed_width_column_wrapper expect_vals({1.0, 0., std::numeric_limits::quiet_NaN(), 0.}, - {1, 1, 1, 0}); + fixed_width_column_wrapper expect_vals({1.0, 0., nan, 0.}, {1, 1, 1, 0}); auto agg = cudf::make_correlation_aggregation(cudf::correlation_type::PEARSON); test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg), force_use_sort_impl::YES); } +TYPED_TEST(groupby_correlation_test, min_periods) +{ + using V = TypeParam; + using R = cudf::detail::target_type_t; + + auto keys = fixed_width_column_wrapper{{1, 2, 3, 1, 2, 2, 1, 3, 3, 2}}; + auto member_0 = fixed_width_column_wrapper{{1, 1, 1, 2, 2, 3, 3, 1, 1, 4}}; + auto member_1 = fixed_width_column_wrapper{{1, 1, 1, 2, 0, 3, 3, 1, 1, 2}}; + auto vals = structs{{member_0, member_1}}; + + fixed_width_column_wrapper expect_keys{1, 2, 3}; + + fixed_width_column_wrapper expect_vals1{{1.0, 0.6, nan}}; + auto agg1 = + cudf::make_correlation_aggregation(cudf::correlation_type::PEARSON, 3); + test_single_agg(keys, vals, expect_keys, expect_vals1, std::move(agg1), force_use_sort_impl::YES); + + fixed_width_column_wrapper expect_vals2{{1.0, 0.6, nan}, {0, 1, 0}}; + auto agg2 = + cudf::make_correlation_aggregation(cudf::correlation_type::PEARSON, 4); + test_single_agg(keys, vals, expect_keys, expect_vals2, std::move(agg2), force_use_sort_impl::YES); + + fixed_width_column_wrapper expect_vals3{{1.0, 0.6, nan}, {0, 0, 0}}; + auto agg3 = + cudf::make_correlation_aggregation(cudf::correlation_type::PEARSON, 5); + test_single_agg(keys, vals, expect_keys, expect_vals3, std::move(agg3), force_use_sort_impl::YES); +} + struct groupby_dictionary_correlation_test : public cudf::test::BaseFixture { }; @@ -203,8 +228,7 @@ TEST_F(groupby_dictionary_correlation_test, basic) auto vals = structs{{member_0, member_1}}; fixed_width_column_wrapper expect_keys{1, 2, 3}; - fixed_width_column_wrapper expect_vals{ - {1.0, 0.6, std::numeric_limits::quiet_NaN()}}; + fixed_width_column_wrapper expect_vals{{1.0, 0.6, nan}}; auto agg = cudf::make_correlation_aggregation(cudf::correlation_type::PEARSON); diff --git a/cpp/tests/groupby/covariance_tests.cpp b/cpp/tests/groupby/covariance_tests.cpp index 039fce16222..3a4fbf92387 100644 --- a/cpp/tests/groupby/covariance_tests.cpp +++ b/cpp/tests/groupby/covariance_tests.cpp @@ -175,6 +175,53 @@ TYPED_TEST(groupby_covariance_test, null_values_different) test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg), force_use_sort_impl::YES); } +TYPED_TEST(groupby_covariance_test, min_periods) +{ + using V = TypeParam; + using R = cudf::detail::target_type_t; + + auto keys = fixed_width_column_wrapper{{1, 2, 3, 1, 2, 2, 1, 3, 3, 2}}; + auto member_0 = fixed_width_column_wrapper{{1, 1, 1, 2, 2, 3, 3, 1, 1, 4}}; + auto member_1 = fixed_width_column_wrapper{{1, 1, 1, 2, 0, 3, 3, 1, 1, 2}}; + auto vals = structs{{member_0, member_1}}; + + fixed_width_column_wrapper expect_keys{1, 2, 3}; + + fixed_width_column_wrapper expect_vals1{{1.0, 1.0, 0.0}}; + auto agg1 = cudf::make_covariance_aggregation(3); + test_single_agg(keys, vals, expect_keys, expect_vals1, std::move(agg1), force_use_sort_impl::YES); + + fixed_width_column_wrapper expect_vals2{{1.0, 1.0, 0.0}, {0, 1, 0}}; + auto agg2 = cudf::make_covariance_aggregation(4); + test_single_agg(keys, vals, expect_keys, expect_vals2, std::move(agg2), force_use_sort_impl::YES); + + fixed_width_column_wrapper expect_vals3{{1.0, 1.0, 0.0}, {0, 0, 0}}; + auto agg3 = cudf::make_covariance_aggregation(5); + test_single_agg(keys, vals, expect_keys, expect_vals3, std::move(agg3), force_use_sort_impl::YES); +} + +TYPED_TEST(groupby_covariance_test, ddof) +{ + using V = TypeParam; + using R = cudf::detail::target_type_t; + + auto keys = fixed_width_column_wrapper{{1, 2, 3, 1, 2, 2, 1, 3, 3, 2}}; + auto member_0 = fixed_width_column_wrapper{{1, 1, 1, 2, 2, 3, 3, 1, 1, 4}}; + auto member_1 = fixed_width_column_wrapper{{1, 1, 1, 2, 0, 3, 3, 1, 1, 2}}; + auto vals = structs{{member_0, member_1}}; + + fixed_width_column_wrapper expect_keys{1, 2, 3}; + + fixed_width_column_wrapper expect_vals1{{2.0, 1.5, 0.0}}; + auto agg1 = cudf::make_covariance_aggregation(1, 2); + test_single_agg(keys, vals, expect_keys, expect_vals1, std::move(agg1), force_use_sort_impl::YES); + + auto const inf = std::numeric_limits::infinity(); + fixed_width_column_wrapper expect_vals2{{inf, 3.0, 0.0}, {0, 1, 0}}; + auto agg2 = cudf::make_covariance_aggregation(1, 3); + test_single_agg(keys, vals, expect_keys, expect_vals2, std::move(agg2), force_use_sort_impl::YES); +} + struct groupby_dictionary_covariance_test : public cudf::test::BaseFixture { };