Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sort groupby to use non-atomic operation #9035

Merged
Merged
8 changes: 6 additions & 2 deletions cpp/benchmarks/groupby/group_sum_benchmark.cu
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ void BM_pre_sorted_sum(benchmark::State& state)

auto data_it = cudf::detail::make_counting_transform_iterator(
0, [=](cudf::size_type row) { return random_int(0, 100); });
auto valid_it = cudf::detail::make_counting_transform_iterator(
0, [=](cudf::size_type row) { return random_int(0, 100) < 90; });

wrapper keys(data_it, data_it + column_size);
wrapper vals(data_it, data_it + column_size);
wrapper vals(data_it, data_it + column_size, valid_it);

auto keys_table = cudf::table_view({keys});
auto sort_order = cudf::sorted_order(keys_table);
Expand All @@ -111,4 +113,6 @@ BENCHMARK_DEFINE_F(Groupby, PreSorted)(::benchmark::State& state) { BM_pre_sorte
BENCHMARK_REGISTER_F(Groupby, PreSorted)
->UseManualTime()
->Unit(benchmark::kMillisecond)
->Arg(10000000);
->Arg(1000000)
->Arg(10000000)
->Arg(100000000);
175 changes: 175 additions & 0 deletions cpp/include/cudf/utilities/output_writer_iterator.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Output writer iterator
#pragma once

#include <thrust/iterator/iterator_adaptor.h>

namespace thrust {
namespace detail {

// Proxy reference that calls BinaryFunction with Iterator value and the rhs of assignment operator
template <typename BinaryFunction, typename Iterator>
class output_writer_iterator_proxy {
public:
__host__ __device__ output_writer_iterator_proxy(const Iterator& index_iter, BinaryFunction fun)
: index_iter(index_iter), fun(fun)
{
}
template <typename T>
__host__ __device__ output_writer_iterator_proxy operator=(const T& x)
{
fun(*index_iter, x);
return *this;
}

private:
Iterator index_iter;
BinaryFunction fun;
};

// Register output_writer_iterator_proxy with 'is_proxy_reference' from
// type_traits to enable its use with algorithms.
template <class BinaryFunction, class Iterator>
struct is_proxy_reference<output_writer_iterator_proxy<BinaryFunction, Iterator>>
: public thrust::detail::true_type {
};

} // namespace detail
} // namespace thrust

namespace cudf {
/**
* @brief Transform output iterator with custom writer binary function which takes index and value.
*
* @code {.cpp}
* #include <cudf/utilities/output_writer_iterator.cuh>
* #include <thrust/device_vector.h>
* #include <thrust/iterator/counting_iterator.h>
* #include <thrust/iterator/transform_iterator.h>
*
* struct set_bits_field {
* int* bitfield;
* __device__ inline void set_bit(size_t bit_index)
* {
* atomicOr(&bitfield[bit_index/32], (int{1} << (bit_index % 32)));
* }
* __device__ inline void clear_bit(size_t bit_index)
* {
* atomicAnd(&bitfield[bit_index / 32], ~(int{1} << (bit_index % 32)));
* }
* // Index, value
* __device__ void operator()(size_t i, bool x)
* {
* if (x)
* set_bit(i);
* else
* clear_bit(i);
* }
* };
*
* thrust::device_vector<int> v(1, 0x0000);
* auto result_begin = cudf::make_output_writer_iterator(thrust::make_counting_iterator(0),
* set_bits_field{v.data().get()});
* auto value = thrust::make_transform_iterator(thrust::make_counting_iterator(0), [] __device__
* (int x) { return x%2;
* });
* thrust::copy(thrust::device, value, value+32, result_begin);
*
* #include <cudf/utilities/output_writer_iterator.cuh>
* #include <thrust/device_vector.h>
* #include <thrust/iterator/counting_iterator.h>
* #include <thrust/iterator/transform_iterator.h>
*
* struct set_bits_field {
* int* bitfield;
* __device__ inline void set_bit(size_t bit_index)
* {
* atomicOr(&bitfield[bit_index/32], (int{1} << (bit_index % 32)));
* }
* __device__ inline void clear_bit(size_t bit_index)
* {
* atomicAnd(&bitfield[bit_index / 32], ~(int{1} << (bit_index % 32)));
* }
* // Index, value
* __device__ void operator()(size_t i, bool x)
* {
* if (x)
* set_bit(i);
* else
* clear_bit(i);
* }
* };
*
* thrust::device_vector<int> v(1, 0x0000);
* auto result_begin = cudf::make_output_writer_iterator(thrust::make_counting_iterator(0),
* set_bits_field{v.data().get()});
* auto value = thrust::make_transform_iterator(thrust::make_counting_iterator(0),
* [] __device__ (int x) { return x%2; });
* thrust::copy(thrust::device, value, value+32, result_begin);
* int(v[0]); // returns 0xaaaaaaaa;
* @endcode
*
*
* @tparam BinaryFunction Binary function to be called with the Iterator value and the rhs of
* assignment operator.
* @tparam Iterator iterator type that acts as index of the output.
*/
template <typename BinaryFunction, typename Iterator>
class output_writer_iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a lot of machinery and I'm not clear about what it's purpose is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this first to test, if I can set element and set_valid at the same time using single reduce_by_key with thrust::optional. But using thrust::optional was always slow.
So, I reverted to use 2 reduce_by_key, 1 for element, another for null_mask.

To use reduce_by_key with null_mask, it needs a temporary bool buffer and valid_if.
To avoid this, I used this transform_output_writer_iterator.
Anyway, this is purely to avoid allocation of temporary bool buffer, but it doesn't affect performance much.
I will revert to using temporary bool buffer, and remove this iterator. (I thought that there may be other use cases for this iterator in cudf. Especially with null_mask.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This machinery is the bare minimum to use a proxy object for assignments in thrust. The question is if the proxy object is useful. If it is, then the machinery is just overhead.

A better name, imho, would be assignment_iterator

From what I see the use case is overriding the assignment operation. In an output iterator, the assignment operator is used to take the value on the rhs and assign it to the lhs. here, we intercept that assignment operation and call a binary operator binop(lhs, rhs) that can override the assignment operation.

This PR uses the assignment_iterator with a lambda that captures a null mask. The proxy intercepts the lhs int and a rhs bool and then invokes the lambda which calls set_valid or set_null on the captured mask. It is the output version of make_validity_iterator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be more specific here. I would like to see a make_validity_output_iterator that is very focused on dealing with null masks of a particular column. With an interface like this, we can start to experiment with opportunistic coalescing of null mask assignments using cooperative groups. Also, this API makes it very obvious that we are doing individual bit assignments and that coalescing them by changing the calling code could prove more performant.

auto make_validity_output_iterator(mutable_column_device_view const& destination);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation may well use the proxy-based iterator you've created, but there would be a very clear use case for it, and other developers will have an easy to use API for that use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elstehle IIRC you've used thrust a lot... have you ever run in to an instance where this sort of assignment interception would be useful?

Sorry, I don't have a concrete use case in the back of my head right now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrhemstad I'm still confused. Why exactly is cooperative groups not an option here? I was thinking we could try coalesced_threads(), followed by labeled_partition() and reduce(), then use thread_rank() == 0 to indicate which threads should write the output, and use atomics to make sure uncoordinated writes don't interfere with one another. I doubt this will be fast in the majority of use cases, but I am not sure why it would not work. It would be an experiment to see if we can speed up the best-case scenario, where all/most writes are sequential.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. I was under the impression cooperative groups was able to detect which threads were active

They can. Determining which threads are active is not the problem.

The problem is coordinating and detecting which threads are attempting to update the same bits in a given 4B or 8B word. For example, if any two threads in a grid want to update bits i and i+1, how do you detect that scenario without some form of communication? coalesced_threads doesn't help you here because whether or not the threads are coalesced doesn't tell you anything about what bits they are updating.

Furthermore, even if the two threads are both active at the same time in the same warp (which you could never rely on) how do you detect that t0 wants to update bit i and t1 update bit i+1 when it could just as well be t0 updates i and t1 updates i + 1042.

Copy link
Contributor

@cwharris cwharris Aug 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's where @karthikeyann 's proxy class comes in to play. It knows the index and the value, and the lambda is responsible for using that index and value to assign the appropriate bit to a captured null mask. It appears we have enough information to attempt opportunistic concurrency here. I'm not saying it will be beneficial, only that it seems possible and might be worth an experiment.

Copy link
Contributor

@cwharris cwharris Aug 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is coordinating and detecting which threads are attempting to update the same bits in a given 4B or 8B word.

the proxy class has the index, and we can pass this to labeled_partition.

For example, if any two threads in a grid want to update bits i and i+1, how do you detect that scenario without some form of communication?

For the sake of the experiment, the communication would be limited to 32 threads within a warp. Communicating across warps would require shmem, and since we're in a lambda called by thrust, I'm not sure if/how that would work.

coalesced_threads doesn't help you here because whether or not the threads are coalesced doesn't tell you anything about what bits they are updating.

coalesced_threads is just to prevent UB/hang, labeled_partition would be responsible for determining what writes can be coalesced by using idx/32 as the label (with some offset, if necessary).

: public thrust::iterator_adaptor<
output_writer_iterator<BinaryFunction, Iterator>,
Iterator,
thrust::use_default,
thrust::use_default,
thrust::use_default,
thrust::detail::output_writer_iterator_proxy<BinaryFunction, Iterator>> {
public:
// parent class.
typedef thrust::iterator_adaptor<
output_writer_iterator<BinaryFunction, Iterator>,
Iterator,
thrust::use_default,
thrust::use_default,
thrust::use_default,
thrust::detail::output_writer_iterator_proxy<BinaryFunction, Iterator>>
super_t;
// friend thrust::iterator_core_access to allow it access to the private interface dereference()
friend class thrust::iterator_core_access;
__host__ __device__ output_writer_iterator(Iterator const& x, BinaryFunction fun)
: super_t(x), fun(fun)
{
}

private:
BinaryFunction fun;

// thrust::iterator_core_access accesses this function
__host__ __device__ typename super_t::reference dereference() const
{
return thrust::detail::output_writer_iterator_proxy<BinaryFunction, Iterator>(
this->base_reference(), fun);
}
};

template <typename BinaryFunction, typename Iterator>
output_writer_iterator<BinaryFunction, Iterator> __host__ __device__
make_output_writer_iterator(Iterator out, BinaryFunction fun)
{
return output_writer_iterator<BinaryFunction, Iterator>(out, fun);
} // end make_output_writer_iterator
} // namespace cudf
5 changes: 4 additions & 1 deletion cpp/src/groupby/sort/group_argmax.cu
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ std::unique_ptr<column> group_argmax(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto indices = type_dispatcher(values.type(),
auto values_type = cudf::is_dictionary(values.type())
? dictionary_column_view(values).keys().type()
: values.type();
auto indices = type_dispatcher(values_type,
reduce_functor<aggregation::ARGMAX>{},
values,
num_groups,
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/groupby/sort/group_argmin.cu
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ std::unique_ptr<column> group_argmin(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto indices = type_dispatcher(values.type(),
auto values_type = cudf::is_dictionary(values.type())
? dictionary_column_view(values).keys().type()
: values.type();
auto indices = type_dispatcher(values_type,
reduce_functor<aggregation::ARGMIN>{},
values,
num_groups,
Expand Down
12 changes: 5 additions & 7 deletions cpp/src/groupby/sort/group_max.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ std::unique_ptr<column> group_max(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return type_dispatcher(values.type(),
reduce_functor<aggregation::MAX>{},
values,
num_groups,
group_labels,
stream,
mr);
auto values_type = cudf::is_dictionary(values.type())
? dictionary_column_view(values).keys().type()
: values.type();
return type_dispatcher(
values_type, reduce_functor<aggregation::MAX>{}, values, num_groups, group_labels, stream, mr);
}

} // namespace detail
Expand Down
12 changes: 5 additions & 7 deletions cpp/src/groupby/sort/group_min.cu
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ std::unique_ptr<column> group_min(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return type_dispatcher(values.type(),
reduce_functor<aggregation::MIN>{},
values,
num_groups,
group_labels,
stream,
mr);
auto values_type = cudf::is_dictionary(values.type())
? dictionary_column_view(values).keys().type()
: values.type();
return type_dispatcher(
values_type, reduce_functor<aggregation::MIN>{}, values, num_groups, group_labels, stream, mr);
}

} // namespace detail
Expand Down
Loading