Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement cudf::group_by (hash) for decimal32 and decimal64 #7190

Merged
merged 13 commits into from
Feb 5, 2021
83 changes: 83 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,34 @@ struct update_target_element<
}
};

template <typename Source, bool target_has_nulls, bool source_has_nulls>
struct update_target_element<Source,
aggregation::MIN,
target_has_nulls,
source_has_nulls,
std::enable_if_t<is_fixed_point<Source>()>> {
__device__ void operator()(mutable_column_device_view target,
size_type target_index,
column_device_view source,
size_type source_index) const noexcept
{
#if (__CUDACC_VER_MAJOR__ != 10) or (__CUDACC_VER_MINOR__ != 2)

if (source_has_nulls and source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::MIN>;
using DeviceTarget = device_storage_type_t<Target>;
using DeviceSource = device_storage_type_t<Source>;
codereport marked this conversation as resolved.
Show resolved Hide resolved

atomicMin(&target.element<DeviceTarget>(target_index),
static_cast<DeviceTarget>(source.element<DeviceSource>(source_index)));

if (target_has_nulls and target.is_null(target_index)) { target.set_valid(target_index); }

#endif
}
};

template <typename Source, bool target_has_nulls, bool source_has_nulls>
struct update_target_element<
Source,
Expand All @@ -151,6 +179,34 @@ struct update_target_element<
}
};

template <typename Source, bool target_has_nulls, bool source_has_nulls>
struct update_target_element<Source,
aggregation::MAX,
target_has_nulls,
source_has_nulls,
std::enable_if_t<is_fixed_point<Source>()>> {
__device__ void operator()(mutable_column_device_view target,
size_type target_index,
column_device_view source,
size_type source_index) const noexcept
{
#if (__CUDACC_VER_MAJOR__ != 10) or (__CUDACC_VER_MINOR__ != 2)

if (source_has_nulls and source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::MAX>;
using DeviceTarget = device_storage_type_t<Target>;
using DeviceSource = device_storage_type_t<Source>;

atomicMax(&target.element<DeviceTarget>(target_index),
static_cast<DeviceTarget>(source.element<DeviceSource>(source_index)));

if (target_has_nulls and target.is_null(target_index)) { target.set_valid(target_index); }

#endif
}
};

template <typename Source, bool target_has_nulls, bool source_has_nulls>
struct update_target_element<
Source,
Expand All @@ -173,6 +229,33 @@ struct update_target_element<
}
};

template <typename Source, bool target_has_nulls, bool source_has_nulls>
struct update_target_element<Source,
aggregation::SUM,
target_has_nulls,
source_has_nulls,
std::enable_if_t<is_fixed_point<Source>()>> {
__device__ void operator()(mutable_column_device_view target,
size_type target_index,
column_device_view source,
size_type source_index) const noexcept
{
#if (__CUDACC_VER_MAJOR__ != 10) or (__CUDACC_VER_MINOR__ != 2)

if (source_has_nulls and source.is_null(source_index)) { return; }

using Target = target_type_t<Source, aggregation::SUM>;
using DeviceTarget = device_storage_type_t<Target>;
using DeviceSource = device_storage_type_t<Source>;

atomicAdd(&target.element<DeviceTarget>(target_index),
static_cast<DeviceTarget>(source.element<DeviceSource>(source_index)));

if (target_has_nulls and target.is_null(target_index)) { target.set_valid(target_index); }
#endif
}
};

/**
* @brief Function object to update a single element in a target column using
* the dictionary key addressed by the specific index.
Expand Down
18 changes: 18 additions & 0 deletions cpp/tests/groupby/group_count_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ TYPED_TEST(FixedPointTestBothReps, GroupBySumProductMinMaxDecimalAsValue)
auto const expect_vals_min = fp_wrapper{{0, 1, 2}, scale};
auto const expect_vals_max = fp_wrapper{{6, 9, 8}, scale};

// group_by sort tests

auto agg1 = cudf::make_sum_aggregation();
test_single_agg(
keys, vals, expect_keys, expect_vals_sum, std::move(agg1), force_use_sort_impl::YES);
Expand All @@ -237,6 +239,22 @@ TYPED_TEST(FixedPointTestBothReps, GroupBySumProductMinMaxDecimalAsValue)
EXPECT_THROW(
test_single_agg(keys, vals, expect_keys, {}, std::move(agg4), force_use_sort_impl::YES),
cudf::logic_error);

// commented out until we drop support for CUDA 10.2
// group_by hash tests

// auto agg5 = cudf::make_sum_aggregation();
// test_single_agg(keys, vals, expect_keys, expect_vals_sum, std::move(agg5));

// auto agg6 = cudf::make_min_aggregation();
// test_single_agg(keys, vals, expect_keys, expect_vals_min, std::move(agg6));

// auto agg7 = cudf::make_max_aggregation();
// test_single_agg(keys, vals, expect_keys, expect_vals_max, std::move(agg7));

// auto agg8 = cudf::make_product_aggregation();
// EXPECT_THROW(test_single_agg(keys, vals, expect_keys, {}, std::move(agg8)),
// cudf::logic_error);
}
}

Expand Down