Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sort groupby to use non-atomic operation #9035

Merged
Merged
8 changes: 6 additions & 2 deletions cpp/benchmarks/groupby/group_sum_benchmark.cu
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ void BM_pre_sorted_sum(benchmark::State& state)

auto data_it = cudf::detail::make_counting_transform_iterator(
0, [=](cudf::size_type row) { return random_int(0, 100); });
auto valid_it = cudf::detail::make_counting_transform_iterator(
0, [=](cudf::size_type row) { return random_int(0, 100) < 90; });

wrapper keys(data_it, data_it + column_size);
wrapper vals(data_it, data_it + column_size);
wrapper vals(data_it, data_it + column_size, valid_it);

auto keys_table = cudf::table_view({keys});
auto sort_order = cudf::sorted_order(keys_table);
Expand All @@ -111,4 +113,6 @@ BENCHMARK_DEFINE_F(Groupby, PreSorted)(::benchmark::State& state) { BM_pre_sorte
BENCHMARK_REGISTER_F(Groupby, PreSorted)
->UseManualTime()
->Unit(benchmark::kMillisecond)
->Arg(10000000);
->Arg(1000000)
->Arg(10000000)
->Arg(100000000);
11 changes: 9 additions & 2 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <cudf/detail/gather.hpp>
#include <cudf/detail/groupby/sort_helper.hpp>
#include <cudf/detail/unary.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/groupby.hpp>
#include <cudf/lists/detail/drop_list_duplicates.hpp>
#include <cudf/table/table.hpp>
Expand Down Expand Up @@ -146,7 +147,10 @@ void aggregate_result_functor::operator()<aggregation::MIN>(aggregation const& a
if (cache.has_result(col_idx, agg)) return;

auto result = [&]() {
if (cudf::is_fixed_width(values.type())) {
auto values_type = cudf::is_dictionary(values.type())
? dictionary_column_view(values).keys().type()
: values.type();
if (cudf::is_fixed_width(values_type)) {
return detail::group_min(
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr);
} else {
Expand Down Expand Up @@ -183,7 +187,10 @@ void aggregate_result_functor::operator()<aggregation::MAX>(aggregation const& a
if (cache.has_result(col_idx, agg)) return;

auto result = [&]() {
if (cudf::is_fixed_width(values.type())) {
auto values_type = cudf::is_dictionary(values.type())
? dictionary_column_view(values).keys().type()
: values.type();
if (cudf::is_fixed_width(values_type)) {
return detail::group_max(
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr);
} else {
Expand Down
12 changes: 5 additions & 7 deletions cpp/src/groupby/sort/group_max.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ std::unique_ptr<column> group_max(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return type_dispatcher(values.type(),
reduce_functor<aggregation::MAX>{},
values,
num_groups,
group_labels,
stream,
mr);
auto values_type = cudf::is_dictionary(values.type())
? dictionary_column_view(values).keys().type()
: values.type();
return type_dispatcher(
values_type, reduce_functor<aggregation::MAX>{}, values, num_groups, group_labels, stream, mr);
}

} // namespace detail
Expand Down
12 changes: 5 additions & 7 deletions cpp/src/groupby/sort/group_min.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ std::unique_ptr<column> group_min(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return type_dispatcher(values.type(),
reduce_functor<aggregation::MIN>{},
values,
num_groups,
group_labels,
stream,
mr);
auto values_type = cudf::is_dictionary(values.type())
? dictionary_column_view(values).keys().type()
: values.type();
return type_dispatcher(
values_type, reduce_functor<aggregation::MIN>{}, values, num_groups, group_labels, stream, mr);
}

} // namespace detail
Expand Down
199 changes: 166 additions & 33 deletions cpp/src/groupby/sort/group_single_pass_reduction_util.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,133 @@
#include <cudf/column/column_view.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/table/table_device_view.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/functional.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/iterator/transform_output_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/reduce.h>

namespace cudf {
namespace groupby {
namespace detail {

// ArgMin binary operator with tuple of (value, index)
template <typename T>
struct ArgMin {
CUDA_HOST_DEVICE_CALLABLE auto operator()(thrust::tuple<T, size_type> const& lhs,
thrust::tuple<T, size_type> const& rhs) const
{
if (thrust::get<1>(lhs) == cudf::detail::ARGMIN_SENTINEL)
return rhs;
else if (thrust::get<1>(rhs) == cudf::detail::ARGMIN_SENTINEL)
return lhs;
else
return thrust::get<0>(lhs) < thrust::get<0>(rhs) ? lhs : rhs;
}
};

// ArgMax binary operator with tuple of (value, index)
template <typename T>
struct ArgMax {
CUDA_HOST_DEVICE_CALLABLE auto operator()(thrust::tuple<T, size_type> const& lhs,
thrust::tuple<T, size_type> const& rhs) const
{
if (thrust::get<1>(lhs) == cudf::detail::ARGMIN_SENTINEL)
return rhs;
else if (thrust::get<1>(rhs) == cudf::detail::ARGMIN_SENTINEL)
return lhs;
else
return thrust::get<0>(lhs) > thrust::get<0>(rhs) ? lhs : rhs;
}
};

struct get_tuple_second_element {
template <typename T>
__device__ size_type operator()(thrust::tuple<T, size_type> const& rhs) const
{
return thrust::get<1>(rhs);
}
};

/**
* @brief Functor to store the boolean value to null mask.
*/
struct bool_to_nullmask {
mutable_column_device_view d_result;
__device__ void operator()(size_type i, bool rhs)
{
if (rhs) {
d_result.set_valid(i);
} else {
d_result.set_null(i);
}
}
};

/**
* @brief Returns index for non-null element, and SENTINEL for null element in a column.
*
*/
struct null_as_sentinel {
column_device_view const col;
size_type const SENTINEL;
__device__ size_type operator()(size_type i) const { return col.is_null(i) ? SENTINEL : i; }
};
Comment on lines +99 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't null_replacement_iterator be used instead?

auto make_null_replacement_iterator(column_device_view const& column,

Copy link
Contributor Author

@karthikeyann karthikeyann Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't. null_replacement_iterator returns values of the column. Here, indices are needed.


/**
* @brief Value accessor for column which supports dictionary column too.
*
* @tparam T Type of the underlying column. For dictionary column, type of the key column.
*/
template <typename T>
struct value_accessor {
column_device_view const col;
bool const is_dict;
value_accessor(column_device_view const& col) : col(col), is_dict(cudf::is_dictionary(col.type()))
{
}

__device__ T value(size_type i) const
{
if (is_dict) {
auto keys = col.child(dictionary_column_view::keys_column_index);
return keys.element<T>(static_cast<size_type>(col.element<dictionary32>(i)));
} else {
return col.element<T>(i);
}
}
__device__ auto operator()(size_type i) const { return value(i); }
};

/**
* @brief Null replaced value accessor for column which supports dictionary column too.
* For null value, returns null `init` value
*
* @tparam T Type of the underlying column. For dictionary column, type of the key column.
*/
template <typename T>
struct null_replaced_value_accessor : value_accessor<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here. Does the existing null_replacement_iterator not work?

Copy link
Contributor Author

@karthikeyann karthikeyann Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null_replacement_iterator right now doesn't support dictionary columns. This functor does. If null_replacement_iterator is updated to support dictionary too, it will add to all kernels using it.
Can I add dictionary support to null_replacement_iterator<T> (T is underlying type, not dictionary32 for dictionary type)?
(could be another PR, column_device_view::begin<T>() could be updated too. It would provide wide support for dictionary columns in most algorithms.
This needs all benchmarks comparison too).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidwendt thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use this?

template <typename KeyType>
auto make_dictionary_pair_iterator(column_device_view const& dictionary_column,
bool has_nulls = true)

Copy link
Contributor

@davidwendt davidwendt Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to use the indices for any cudf operations where possible for both run-time and compile-time performance. For example, sorting in general only needs the indices.
You can use this function

column_view get_indices_annotated() const noexcept;

to get the indices column_view decorated with the offset, size, and validity-mask appropriately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hash groupby produces base type column as output.
If we use gather with ARGMIN, or ARGMAX for MIN, or MAX, it would create dictionary column. (added one more test for this, and updated sort groupby to fix this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds correct to me. Aggregates like min/max return values that already exist in the column so the output would have the same keys as the input. Whereas, sum/prod create totally new values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, here is an example using the the dictionary-pair-iterator along with a null-replacement transformer.

auto f = simple_op.template get_null_replacing_element_transformer<ResultType>();
auto p =
cudf::dictionary::detail::make_dictionary_pair_iterator<ElementType>(*dcol, col.has_nulls());
auto it = thrust::make_transform_iterator(p, f);
return detail::reduce(it, col.size(), simple_op, stream, mr);

I'm inclined to prefer your approach here instead since it simplifies the caller to one value-accessor. The only thing that makes me nervous is that col.element<dictionary32>(i) would be included/inlined for every type and that function contains it's own type-dispatcher call in it. But technically every type is potentially a dictionary key type so I think the same amount of code is generated either way. Anyway, it may be worth looking into using this null-replacement accessor in the reductions code too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dictionary32 means 32 bit int index right?
why is there another type dispatcher for col.element<dictionary32>(i) if index type is already known?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dictionary index types can technically be any unsigned integer type. The element<dictionary32>(i) always returns an int32 value regardless of the underlying indices type.
https://github.com/rapidsai/cudf/blob/branch-21.10/cpp/include/cudf/column/column_device_view.cuh#L415-L421

using super_t = value_accessor<T>;
bool const has_nulls;
T const init;
null_replaced_value_accessor(column_device_view const& col, T const& init, bool const has_nulls)
: super_t(col), init(init), has_nulls(has_nulls)
{
}
__device__ T operator()(size_type i) const
{
return has_nulls && super_t::col.is_null_nocheck(i) ? init : super_t::value(i);
}
};

template <aggregation::Kind K>
struct reduce_functor {
template <typename T>
Expand All @@ -61,51 +174,71 @@ struct reduce_functor {
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using DeviceType = device_storage_type_t<T>;
using OpType = cudf::detail::corresponding_operator_t<K>;
using ResultType = cudf::detail::target_type_t<T, K>;
using DeviceType = device_storage_type_t<T>;
using OpType = cudf::detail::corresponding_operator_t<K>;
using ResultType = cudf::detail::target_type_t<T, K>;
using ResultDType = device_storage_type_t<ResultType>;

auto result_type = is_fixed_point<ResultType>()
? data_type{type_to_id<ResultType>(), values.type().scale()}
: data_type{type_to_id<ResultType>()};

std::unique_ptr<column> result =
make_fixed_width_column(result_type,
num_groups,
values.has_nulls() ? mask_state::ALL_NULL : mask_state::UNALLOCATED,
stream,
mr);
make_fixed_width_column(result_type, num_groups, mask_state::UNALLOCATED, stream, mr);

if (values.is_empty()) { return result; }

auto result_table = mutable_table_view({*result});
cudf::detail::initialize_with_identity(result_table, {K}, stream);

auto resultview = mutable_column_device_view::create(result->mutable_view(), stream);
auto valuesview = column_device_view::create(values, stream);

if (!cudf::is_dictionary(values.type())) {
thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
values.size(),
[d_values = *valuesview,
d_result = *resultview,
dest_indices = group_labels.data()] __device__(auto i) {
cudf::detail::update_target_element<DeviceType, K, true, true>{}(
d_result, dest_indices[i], d_values, i);
});
if constexpr (K == aggregation::ARGMAX || K == aggregation::ARGMIN) {
constexpr auto SENTINEL =
(K == aggregation::ARGMAX ? cudf::detail::ARGMAX_SENTINEL : cudf::detail::ARGMIN_SENTINEL);
auto idx_begin =
cudf::detail::make_counting_transform_iterator(0, null_as_sentinel{*valuesview, SENTINEL});
// dictionary keys are sorted, so dictionary32 index comparison is enough.
auto column_begin = valuesview->begin<DeviceType>();
auto begin = thrust::make_zip_iterator(thrust::make_tuple(column_begin, idx_begin));
auto result_begin = thrust::make_transform_output_iterator(resultview->begin<ResultDType>(),
get_tuple_second_element{});
using OpType =
std::conditional_t<(K == aggregation::ARGMAX), ArgMax<DeviceType>, ArgMin<DeviceType>>;
thrust::reduce_by_key(rmm::exec_policy(stream),
group_labels.data(),
group_labels.data() + group_labels.size(),
begin,
thrust::make_discard_iterator(),
result_begin,
thrust::equal_to<size_type>{},
OpType{});
} else {
thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
values.size(),
[d_values = *valuesview,
d_result = *resultview,
dest_indices = group_labels.data()] __device__(auto i) {
cudf::detail::update_target_element<dictionary32, K, true, true>{}(
d_result, dest_indices[i], d_values, i);
});
auto init = OpType::template identity<DeviceType>();
auto begin = cudf::detail::make_counting_transform_iterator(
0, null_replaced_value_accessor{*valuesview, init, values.has_nulls()});
thrust::reduce_by_key(rmm::exec_policy(stream),
group_labels.data(),
group_labels.data() + group_labels.size(),
begin,
thrust::make_discard_iterator(),
resultview->begin<ResultDType>(),
thrust::equal_to<size_type>{},
OpType{});
}

if (values.has_nulls()) {
rmm::device_uvector<bool> validity(num_groups, stream);
thrust::reduce_by_key(rmm::exec_policy(stream),
group_labels.data(),
group_labels.data() + group_labels.size(),
cudf::detail::make_validity_iterator(*valuesview),
thrust::make_discard_iterator(),
validity.begin(),
thrust::equal_to<size_type>{},
thrust::logical_or<bool>{});
auto [null_mask, null_count] = cudf::detail::valid_if(
validity.begin(), validity.end(), thrust::identity<bool>{}, stream, mr);
result->set_null_mask(std::move(null_mask));
result->set_null_count(null_count);
}
return result;
}

Expand Down
24 changes: 24 additions & 0 deletions cpp/tests/groupby/max_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,30 @@ TEST_F(groupby_dictionary_max_test, basic)
force_use_sort_impl::YES);
}

TEST_F(groupby_dictionary_max_test, fixed_width)
{
using V = int64_t;

// clang-format off
fixed_width_column_wrapper<K> keys{ 1, 2, 3, 1, 2, 2, 1, 3, 3, 2 };
dictionary_column_wrapper<V> vals{ 0xABC, 0xBBB, 0xF1, 0xAAA, 0xFFF, 0xBAA, 0xAAA, 0x01, 0xF1, 0xEEE};
fixed_width_column_wrapper<K> expect_keys { 1, 2, 3 };
fixed_width_column_wrapper<V> expect_vals_w({ 0xABC, 0xFFF, 0xF1 });
// clang-format on

test_single_agg(keys,
vals,
expect_keys,
expect_vals_w,
cudf::make_max_aggregation<cudf::groupby_aggregation>());
test_single_agg(keys,
vals,
expect_keys,
expect_vals_w,
cudf::make_max_aggregation<cudf::groupby_aggregation>(),
force_use_sort_impl::YES);
}

template <typename T>
struct FixedPointTestBothReps : public cudf::test::BaseFixture {
};
Expand Down
24 changes: 24 additions & 0 deletions cpp/tests/groupby/min_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,30 @@ TEST_F(groupby_dictionary_min_test, basic)
force_use_sort_impl::YES);
}

TEST_F(groupby_dictionary_min_test, fixed_width)
{
using V = int64_t;

// clang-format off
fixed_width_column_wrapper<K> keys{ 1, 2, 3, 1, 2, 2, 1, 3, 3, 2 };
dictionary_column_wrapper<V> vals{ 0xABC, 0xBBB, 0xF1, 0xAAA, 0xFFF, 0xBAA, 0xAAA, 0x01, 0xF1, 0xEEE};
fixed_width_column_wrapper<K> expect_keys { 1, 2, 3 };
fixed_width_column_wrapper<V> expect_vals_w({ 0xAAA, 0xBAA, 0x01 });
// clang-format on

test_single_agg(keys,
vals,
expect_keys,
expect_vals_w,
cudf::make_min_aggregation<cudf::groupby_aggregation>());
test_single_agg(keys,
vals,
expect_keys,
expect_vals_w,
cudf::make_min_aggregation<cudf::groupby_aggregation>(),
force_use_sort_impl::YES);
}

template <typename T>
struct FixedPointTestBothReps : public cudf::test::BaseFixture {
};
Expand Down