Skip to content

Commit

Permalink
add min_periods, ddof to groupby covariance, & correlation aggregation (
Browse files Browse the repository at this point in the history
#9492)

Addresses part of #8691
Add min_periods and ddof parameters to libcudf groupby covariance and Pearson correlation (python needs this)

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Devavret Makkar (https://github.com/devavret)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #9492
  • Loading branch information
karthikeyann authored Oct 26, 2021
1 parent d6b624c commit c0951ba
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 50 deletions.
11 changes: 8 additions & 3 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,20 +503,25 @@ std::unique_ptr<Base> make_merge_m2_aggregation();
*
* Compute covariance between two columns.
* The input columns are child columns of a non-nullable struct columns.
* @param min_periods Minimum number of non-null observations required to produce a result.
* @param ddof Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N is
* the number of non-null observations.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_covariance_aggregation();
std::unique_ptr<Base> make_covariance_aggregation(size_type min_periods = 1, size_type ddof = 1);

/**
* @brief Factory to create a CORRELATION aggregation
*
* Compute correlation coefficient between two columns.
* The input columns are child columns of a non-nullable struct columns.
*
* @param[in] type: correlation_type
* @param type correlation_type
* @param min_periods Minimum number of non-null observations required to produce a result.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_correlation_aggregation(correlation_type type);
std::unique_ptr<Base> make_correlation_aggregation(correlation_type type,
size_type min_periods = 1);

/**
* @brief Factory to create a TDIGEST aggregation
Expand Down
26 changes: 23 additions & 3 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,14 @@ class merge_m2_aggregation final : public groupby_aggregation {
*/
class covariance_aggregation final : public groupby_aggregation {
public:
explicit covariance_aggregation() : aggregation{COVARIANCE} {}
explicit covariance_aggregation(size_type min_periods, size_type ddof)
: aggregation{COVARIANCE}, _min_periods{min_periods}, _ddof(ddof)
{
}
size_type _min_periods;
size_type _ddof;

size_t do_hash() const override { return this->aggregation::do_hash() ^ hash_impl(); }

std::unique_ptr<aggregation> clone() const override
{
Expand All @@ -913,15 +920,25 @@ class covariance_aggregation final : public groupby_aggregation {
return collector.visit(col_type, *this);
}
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }

protected:
size_t hash_impl() const
{
return std::hash<size_type>{}(_min_periods) ^ std::hash<size_type>{}(_ddof);
}
};

/**
* @brief Derived aggregation class for specifying CORRELATION aggregation
*/
class correlation_aggregation final : public groupby_aggregation {
public:
explicit correlation_aggregation(correlation_type type) : aggregation{CORRELATION}, _type{type} {}
explicit correlation_aggregation(correlation_type type, size_type min_periods)
: aggregation{CORRELATION}, _type{type}, _min_periods{min_periods}
{
}
correlation_type _type;
size_type _min_periods;

bool is_equal(aggregation const& _other) const override
{
Expand All @@ -944,7 +961,10 @@ class correlation_aggregation final : public groupby_aggregation {
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }

protected:
size_t hash_impl() const { return std::hash<int>{}(static_cast<int>(_type)); }
size_t hash_impl() const
{
return std::hash<int>{}(static_cast<int>(_type)) ^ std::hash<size_type>{}(_min_periods);
}
};

/**
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -713,23 +713,25 @@ template std::unique_ptr<groupby_aggregation> make_merge_m2_aggregation<groupby_

/// Factory to create a COVARIANCE aggregation
template <typename Base>
std::unique_ptr<Base> make_covariance_aggregation()
std::unique_ptr<Base> make_covariance_aggregation(size_type min_periods, size_type ddof)
{
return std::make_unique<detail::covariance_aggregation>();
return std::make_unique<detail::covariance_aggregation>(min_periods, ddof);
}
template std::unique_ptr<aggregation> make_covariance_aggregation<aggregation>();
template std::unique_ptr<groupby_aggregation> make_covariance_aggregation<groupby_aggregation>();
template std::unique_ptr<aggregation> make_covariance_aggregation<aggregation>(
size_type min_periods, size_type ddof);
template std::unique_ptr<groupby_aggregation> make_covariance_aggregation<groupby_aggregation>(
size_type min_periods, size_type ddof);

/// Factory to create a CORRELATION aggregation
template <typename Base>
std::unique_ptr<Base> make_correlation_aggregation(correlation_type type)
std::unique_ptr<Base> make_correlation_aggregation(correlation_type type, size_type min_periods)
{
return std::make_unique<detail::correlation_aggregation>(type);
return std::make_unique<detail::correlation_aggregation>(type, min_periods);
}
template std::unique_ptr<aggregation> make_correlation_aggregation<aggregation>(
correlation_type type);
correlation_type type, size_type min_periods);
template std::unique_ptr<groupby_aggregation> make_correlation_aggregation<groupby_aggregation>(
correlation_type type);
correlation_type type, size_type min_periods);

template <typename Base>
std::unique_ptr<Base> make_tdigest_aggregation(int max_centroids)
Expand Down
50 changes: 29 additions & 21 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ void aggregate_result_functor::operator()<aggregation::COVARIANCE>(aggregation c
CUDF_EXPECTS(values.num_children() == 2,
"Input to `groupby covariance` must be a structs column having 2 children columns.");

auto const& cov_agg = dynamic_cast<cudf::detail::covariance_aggregation const&>(agg);
// Covariance only for valid values in both columns.
// in non-identical null mask cases, this prevents caching of the results - STD, MEAN, COUNT.
auto [_, values_child0, values_child1] =
Expand All @@ -596,6 +597,8 @@ void aggregate_result_functor::operator()<aggregation::COVARIANCE>(aggregation c
count,
mean0,
mean1,
cov_agg._min_periods,
cov_agg._ddof,
stream,
mr));
};
Expand Down Expand Up @@ -629,28 +632,33 @@ void aggregate_result_functor::operator()<aggregation::CORRELATION>(aggregation
aggregate_result_functor(values_child0, helper, cache, stream, mr).operator()<aggregation::STD>(*std_agg);
aggregate_result_functor(values_child1, helper, cache, stream, mr).operator()<aggregation::STD>(*std_agg);

auto const stddev0 = cache.get_result(values_child0, *std_agg);
auto const stddev1 = cache.get_result(values_child1, *std_agg);

auto mean_agg = make_mean_aggregation();
auto const mean0 = cache.get_result(values_child0, *mean_agg);
auto const mean1 = cache.get_result(values_child1, *mean_agg);
auto count_agg = make_count_aggregation();
auto const count = cache.get_result(values_child0, *count_agg);

// Compute covariance here to avoid repeated computation of mean & count
auto cov_agg = make_covariance_aggregation();
cache.add_result(values,
*cov_agg,
detail::group_covariance(get_grouped_values().child(0),
get_grouped_values().child(1),
helper.group_labels(stream),
helper.num_groups(stream),
count,
mean0,
mean1,
stream,
mr));
auto cov_agg = make_covariance_aggregation(corr_agg._min_periods);
if (not cache.has_result(values, *cov_agg)) {
auto mean_agg = make_mean_aggregation();
auto const mean0 = cache.get_result(values_child0, *mean_agg);
auto const mean1 = cache.get_result(values_child1, *mean_agg);
auto count_agg = make_count_aggregation();
auto const count = cache.get_result(values_child0, *count_agg);

auto const& cov_agg_obj = dynamic_cast<cudf::detail::covariance_aggregation const&>(*cov_agg);
cache.add_result(values,
*cov_agg,
detail::group_covariance(get_grouped_values().child(0),
get_grouped_values().child(1),
helper.group_labels(stream),
helper.num_groups(stream),
count,
mean0,
mean1,
cov_agg_obj._min_periods,
cov_agg_obj._ddof,
stream,
mr));
}

auto const stddev0 = cache.get_result(values_child0, *std_agg);
auto const stddev1 = cache.get_result(values_child1, *std_agg);
auto const covariance = cache.get_result(values, *cov_agg);
cache.add_result(
values, agg, detail::group_correlation(covariance, stddev0, stddev1, stream, mr));
Expand Down
15 changes: 11 additions & 4 deletions cpp/src/groupby/sort/group_correlation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ std::unique_ptr<column> group_covariance(column_view const& values_0,
column_view const& count,
column_view const& mean_0,
column_view const& mean_1,
size_type min_periods,
size_type ddof,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand Down Expand Up @@ -140,8 +142,13 @@ std::unique_ptr<column> group_covariance(column_view const& values_0,

auto d_values_0 = column_device_view::create(values_0, stream);
auto d_values_1 = column_device_view::create(values_1, stream);
covariance_transform<result_type> covariance_transform_op{
*d_values_0, *d_values_1, mean0_ptr, mean1_ptr, count.data<size_type>(), group_labels.begin()};
covariance_transform<result_type> covariance_transform_op{*d_values_0,
*d_values_1,
mean0_ptr,
mean1_ptr,
count.data<size_type>(),
group_labels.begin(),
ddof};

auto result = make_numeric_column(
data_type(type_to_id<result_type>()), num_groups, mask_state::UNALLOCATED, stream, mr);
Expand All @@ -157,8 +164,8 @@ std::unique_ptr<column> group_covariance(column_view const& values_0,
thrust::make_discard_iterator(),
d_result);

auto is_null = [ddof = covariance_transform_op.ddof] __device__(size_type group_size) {
return not(group_size == 0 or group_size - ddof <= 0);
auto is_null = [ddof, min_periods] __device__(size_type group_size) {
return not(group_size == 0 or group_size - ddof <= 0 or group_size < min_periods);
};
auto [new_nullmask, null_count] =
cudf::detail::valid_if(count.begin<size_type>(), count.end<size_type>(), is_null, stream, mr);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/groupby/sort/group_reductions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ std::unique_ptr<column> group_merge_m2(column_view const& values,
* @param count The count of valid rows of the grouped values of both columns
* @param mean_0 The mean of the first grouped values column
* @param mean_1 The mean of the second grouped values column
* @param min_periods The minimum number of non-null rows required to consider the covariance
* @param ddof The delta degrees of freedom used in the calculation of the variance
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*/
Expand All @@ -461,6 +463,8 @@ std::unique_ptr<column> group_covariance(column_view const& values_0,
column_view const& count,
column_view const& mean_0,
column_view const& mean_1,
size_type min_periods,
size_type ddof,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

Expand Down
46 changes: 35 additions & 11 deletions cpp/tests/groupby/correlation_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ using namespace cudf::test::iterators;
namespace cudf {
namespace test {

using structs = structs_column_wrapper;
constexpr auto nan = std::numeric_limits<double>::quiet_NaN();
using structs = structs_column_wrapper;

template <typename V>
struct groupby_correlation_test : public cudf::test::BaseFixture {
Expand All @@ -54,8 +55,7 @@ TYPED_TEST(groupby_correlation_test, basic)
auto vals = structs{{member_0, member_1}};

fixed_width_column_wrapper<K> expect_keys{1, 2, 3};
fixed_width_column_wrapper<R, double> expect_vals{
{1.0, 0.6, std::numeric_limits<double>::quiet_NaN()}};
fixed_width_column_wrapper<R, double> expect_vals{{1.0, 0.6, nan}};

auto agg =
cudf::make_correlation_aggregation<groupby_aggregation>(cudf::correlation_type::PEARSON);
Expand Down Expand Up @@ -129,8 +129,7 @@ TYPED_TEST(groupby_correlation_test, null_keys_and_values)
auto vals = structs{{val0, val1}};

fixed_width_column_wrapper<K> expect_keys({1, 2, 3, 4}, no_nulls());
fixed_width_column_wrapper<R> expect_vals(
{1.0, 0.6, std::numeric_limits<double>::quiet_NaN(), 0.}, {1, 1, 1, 0});
fixed_width_column_wrapper<R> expect_vals({1.0, 0.6, nan, 0.}, {1, 1, 1, 0});

auto agg =
cudf::make_correlation_aggregation<groupby_aggregation>(cudf::correlation_type::PEARSON);
Expand All @@ -153,8 +152,7 @@ TYPED_TEST(groupby_correlation_test, null_values_same)
auto vals = structs{{val0, val1}};

fixed_width_column_wrapper<K> expect_keys({1, 2, 3, 4}, no_nulls());
fixed_width_column_wrapper<R> expect_vals(
{1.0, 0.6, std::numeric_limits<double>::quiet_NaN(), 0.}, {1, 1, 1, 0});
fixed_width_column_wrapper<R> expect_vals({1.0, 0.6, nan, 0.}, {1, 1, 1, 0});

auto agg =
cudf::make_correlation_aggregation<groupby_aggregation>(cudf::correlation_type::PEARSON);
Expand All @@ -181,14 +179,41 @@ TYPED_TEST(groupby_correlation_test, null_values_different)
auto vals = structs{{val0, val1}};

fixed_width_column_wrapper<K> expect_keys({1, 2, 3, 4}, no_nulls());
fixed_width_column_wrapper<R> expect_vals({1.0, 0., std::numeric_limits<double>::quiet_NaN(), 0.},
{1, 1, 1, 0});
fixed_width_column_wrapper<R> expect_vals({1.0, 0., nan, 0.}, {1, 1, 1, 0});

auto agg =
cudf::make_correlation_aggregation<groupby_aggregation>(cudf::correlation_type::PEARSON);
test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg), force_use_sort_impl::YES);
}

TYPED_TEST(groupby_correlation_test, min_periods)
{
using V = TypeParam;
using R = cudf::detail::target_type_t<V, aggregation::CORRELATION>;

auto keys = fixed_width_column_wrapper<K>{{1, 2, 3, 1, 2, 2, 1, 3, 3, 2}};
auto member_0 = fixed_width_column_wrapper<V>{{1, 1, 1, 2, 2, 3, 3, 1, 1, 4}};
auto member_1 = fixed_width_column_wrapper<V>{{1, 1, 1, 2, 0, 3, 3, 1, 1, 2}};
auto vals = structs{{member_0, member_1}};

fixed_width_column_wrapper<K> expect_keys{1, 2, 3};

fixed_width_column_wrapper<R, double> expect_vals1{{1.0, 0.6, nan}};
auto agg1 =
cudf::make_correlation_aggregation<groupby_aggregation>(cudf::correlation_type::PEARSON, 3);
test_single_agg(keys, vals, expect_keys, expect_vals1, std::move(agg1), force_use_sort_impl::YES);

fixed_width_column_wrapper<R, double> expect_vals2{{1.0, 0.6, nan}, {0, 1, 0}};
auto agg2 =
cudf::make_correlation_aggregation<groupby_aggregation>(cudf::correlation_type::PEARSON, 4);
test_single_agg(keys, vals, expect_keys, expect_vals2, std::move(agg2), force_use_sort_impl::YES);

fixed_width_column_wrapper<R, double> expect_vals3{{1.0, 0.6, nan}, {0, 0, 0}};
auto agg3 =
cudf::make_correlation_aggregation<groupby_aggregation>(cudf::correlation_type::PEARSON, 5);
test_single_agg(keys, vals, expect_keys, expect_vals3, std::move(agg3), force_use_sort_impl::YES);
}

struct groupby_dictionary_correlation_test : public cudf::test::BaseFixture {
};

Expand All @@ -203,8 +228,7 @@ TEST_F(groupby_dictionary_correlation_test, basic)
auto vals = structs{{member_0, member_1}};

fixed_width_column_wrapper<K> expect_keys{1, 2, 3};
fixed_width_column_wrapper<R, double> expect_vals{
{1.0, 0.6, std::numeric_limits<double>::quiet_NaN()}};
fixed_width_column_wrapper<R, double> expect_vals{{1.0, 0.6, nan}};

auto agg =
cudf::make_correlation_aggregation<groupby_aggregation>(cudf::correlation_type::PEARSON);
Expand Down
47 changes: 47 additions & 0 deletions cpp/tests/groupby/covariance_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,53 @@ TYPED_TEST(groupby_covariance_test, null_values_different)
test_single_agg(keys, vals, expect_keys, expect_vals, std::move(agg), force_use_sort_impl::YES);
}

TYPED_TEST(groupby_covariance_test, min_periods)
{
using V = TypeParam;
using R = cudf::detail::target_type_t<V, aggregation::COVARIANCE>;

auto keys = fixed_width_column_wrapper<K>{{1, 2, 3, 1, 2, 2, 1, 3, 3, 2}};
auto member_0 = fixed_width_column_wrapper<V>{{1, 1, 1, 2, 2, 3, 3, 1, 1, 4}};
auto member_1 = fixed_width_column_wrapper<V>{{1, 1, 1, 2, 0, 3, 3, 1, 1, 2}};
auto vals = structs{{member_0, member_1}};

fixed_width_column_wrapper<K> expect_keys{1, 2, 3};

fixed_width_column_wrapper<R, double> expect_vals1{{1.0, 1.0, 0.0}};
auto agg1 = cudf::make_covariance_aggregation<groupby_aggregation>(3);
test_single_agg(keys, vals, expect_keys, expect_vals1, std::move(agg1), force_use_sort_impl::YES);

fixed_width_column_wrapper<R, double> expect_vals2{{1.0, 1.0, 0.0}, {0, 1, 0}};
auto agg2 = cudf::make_covariance_aggregation<groupby_aggregation>(4);
test_single_agg(keys, vals, expect_keys, expect_vals2, std::move(agg2), force_use_sort_impl::YES);

fixed_width_column_wrapper<R, double> expect_vals3{{1.0, 1.0, 0.0}, {0, 0, 0}};
auto agg3 = cudf::make_covariance_aggregation<groupby_aggregation>(5);
test_single_agg(keys, vals, expect_keys, expect_vals3, std::move(agg3), force_use_sort_impl::YES);
}

TYPED_TEST(groupby_covariance_test, ddof)
{
using V = TypeParam;
using R = cudf::detail::target_type_t<V, aggregation::COVARIANCE>;

auto keys = fixed_width_column_wrapper<K>{{1, 2, 3, 1, 2, 2, 1, 3, 3, 2}};
auto member_0 = fixed_width_column_wrapper<V>{{1, 1, 1, 2, 2, 3, 3, 1, 1, 4}};
auto member_1 = fixed_width_column_wrapper<V>{{1, 1, 1, 2, 0, 3, 3, 1, 1, 2}};
auto vals = structs{{member_0, member_1}};

fixed_width_column_wrapper<K> expect_keys{1, 2, 3};

fixed_width_column_wrapper<R, double> expect_vals1{{2.0, 1.5, 0.0}};
auto agg1 = cudf::make_covariance_aggregation<groupby_aggregation>(1, 2);
test_single_agg(keys, vals, expect_keys, expect_vals1, std::move(agg1), force_use_sort_impl::YES);

auto const inf = std::numeric_limits<double>::infinity();
fixed_width_column_wrapper<R, double> expect_vals2{{inf, 3.0, 0.0}, {0, 1, 0}};
auto agg2 = cudf::make_covariance_aggregation<groupby_aggregation>(1, 3);
test_single_agg(keys, vals, expect_keys, expect_vals2, std::move(agg2), force_use_sort_impl::YES);
}

struct groupby_dictionary_covariance_test : public cudf::test::BaseFixture {
};

Expand Down

0 comments on commit c0951ba

Please sign in to comment.