Skip to content

Commit

Permalink
Consolidate SUM reductions (#2381)
Browse files Browse the repository at this point in the history
This PR consists of multiple parts:

1. redirect custom reduction kernels within `stats `namespace to `linalg::reduce`
2. Specialize reduction kernels for addition utilizing the _Kahan-Babushka-Neumaier-Sum_ [link](https://en.wikipedia.org/wiki/Kahan_summation_algorithm)
3. Slightly adjust kernel heuristics for coalesced reductions

This should address #2366 and #2205. With the kernel heuristics adjusted the maximum performance drop is 4%.

FYI, @tfeher

Authors:
  - Malte Förster (https://github.com/mfoerste4)

Approvers:
  - Tamas Bela Feher (https://github.com/tfeher)
  - Corey J. Nolet (https://github.com/cjnolet)

URL: #2381
  • Loading branch information
mfoerste4 authored Jul 24, 2024
1 parent c30fc23 commit 759095a
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 367 deletions.
6 changes: 5 additions & 1 deletion cpp/include/raft/core/math.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ constexpr RAFT_INLINE_FUNCTION auto abs(T x)
!std::is_same_v<long long int, T>,
T>
{
return x < T{0} ? -x : x;
if constexpr (std::is_unsigned_v<T>) {
return x;
} else {
return x < T{0} ? -x : x;
}
}
#if defined(_RAFT_HAS_CUDA)
template <typename T>
Expand Down
218 changes: 201 additions & 17 deletions cpp/include/raft/linalg/detail/coalesced_reduction-inl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ struct ReductionThinPolicy {
static constexpr bool NoSequentialReduce = noLoop;
};

template <typename Type>
DI void KahanBabushkaNeumaierSum(Type& sum, Type& compensation, const Type& cur_value)
{
const Type t = sum + cur_value;
if (abs(sum) >= abs(cur_value)) {
compensation += (sum - t) + cur_value;
} else {
compensation += (cur_value - t) + sum;
}
sum = t;
}

template <typename Policy,
typename InType,
typename OutType,
Expand Down Expand Up @@ -130,6 +142,99 @@ RAFT_KERNEL __launch_bounds__(Policy::ThreadsPerBlock)
}
}

template <typename Policy,
typename InType,
typename OutType,
typename IdxType,
typename MainLambda,
typename FinalLambda>
RAFT_KERNEL __launch_bounds__(Policy::ThreadsPerBlock) coalescedSumThinKernel(OutType* dots,
const InType* data,
IdxType D,
IdxType N,
OutType init,
MainLambda main_op,
FinalLambda final_op,
bool inplace = false)
{
/* The strategy to achieve near-SOL memory bandwidth differs based on D:
* - For small D, we need to process multiple rows per logical warp in order to have
* multiple loads per thread and increase bytes in flight and amortize latencies.
* - For large D, we start with a sequential reduction. The compiler partially unrolls
* that loop (e.g. first a loop of stride 16, then 8, 4, and 1).
*/
IdxType i0 = threadIdx.y + (Policy::RowsPerBlock * static_cast<IdxType>(blockIdx.x));
if (i0 >= N) return;

OutType acc[Policy::RowsPerLogicalWarp];
OutType thread_c[Policy::RowsPerLogicalWarp];

#pragma unroll
for (int k = 0; k < Policy::RowsPerLogicalWarp; k++) {
acc[k] = init;
thread_c[k] = 0;
}

if constexpr (Policy::NoSequentialReduce) {
IdxType j = threadIdx.x;
if (j < D) {
#pragma unroll
for (IdxType k = 0; k < Policy::RowsPerLogicalWarp; k++) {
// Only the first row is known to be within bounds. Clamp to avoid out-of-mem read.
const IdxType i = raft::min(i0 + k * Policy::NumLogicalWarps, N - 1);
// acc[k] = reduce_op(acc[k], main_op(data[j + (D * i)], j));
KahanBabushkaNeumaierSum<OutType>(acc[k], thread_c[k], main_op(data[j + (D * i)], j));
}
}
} else {
for (IdxType j = threadIdx.x; j < D; j += Policy::LogicalWarpSize) {
#pragma unroll
for (IdxType k = 0; k < Policy::RowsPerLogicalWarp; k++) {
const IdxType i = raft::min(i0 + k * Policy::NumLogicalWarps, N - 1);
// acc[k] = reduce_op(acc[k], main_op(data[j + (D * i)], j));
KahanBabushkaNeumaierSum<OutType>(acc[k], thread_c[k], main_op(data[j + (D * i)], j));
}
}
}

/* This vector reduction has two benefits compared to naive separate reductions:
* - It avoids the LSU bottleneck when the number of columns is around 32 (e.g. for 32, 5 shuffles
* are required and there is no initial sequential reduction to amortize that cost).
* - It distributes the outputs to multiple threads, enabling a coalesced store when the number of
* rows per logical warp and logical warp size are equal.
*/
raft::logicalWarpReduceVector<Policy::LogicalWarpSize, Policy::RowsPerLogicalWarp>(
acc, threadIdx.x, raft::add_op());

raft::logicalWarpReduceVector<Policy::LogicalWarpSize, Policy::RowsPerLogicalWarp>(
thread_c, threadIdx.x, raft::add_op());

constexpr int reducOutVecWidth =
std::max(1, Policy::RowsPerLogicalWarp / Policy::LogicalWarpSize);
constexpr int reducOutGroupSize =
std::max(1, Policy::LogicalWarpSize / Policy::RowsPerLogicalWarp);
constexpr int reducNumGroups = Policy::LogicalWarpSize / reducOutGroupSize;

if (threadIdx.x % reducOutGroupSize == 0) {
const int groupId = threadIdx.x / reducOutGroupSize;
if (inplace) {
#pragma unroll
for (int k = 0; k < reducOutVecWidth; k++) {
const int reductionId = k * reducNumGroups + groupId;
const IdxType i = i0 + reductionId * Policy::NumLogicalWarps;
if (i < N) { dots[i] = final_op(dots[i] + acc[k] + thread_c[k]); }
}
} else {
#pragma unroll
for (int k = 0; k < reducOutVecWidth; k++) {
const int reductionId = k * reducNumGroups + groupId;
const IdxType i = i0 + reductionId * Policy::NumLogicalWarps;
if (i < N) { dots[i] = final_op(acc[k] + thread_c[k]); }
}
}
}
}

template <typename Policy,
typename InType,
typename OutType = InType,
Expand All @@ -156,8 +261,13 @@ void coalescedReductionThin(OutType* dots,
static_cast<int>(Policy::NoSequentialReduce));
dim3 threads(Policy::LogicalWarpSize, Policy::NumLogicalWarps, 1);
dim3 blocks(ceildiv<IdxType>(N, Policy::RowsPerBlock), 1, 1);
coalescedReductionThinKernel<Policy>
<<<blocks, threads, 0, stream>>>(dots, data, D, N, init, main_op, reduce_op, final_op, inplace);
if constexpr (std::is_same_v<ReduceLambda, raft::add_op>) {
coalescedSumThinKernel<Policy>
<<<blocks, threads, 0, stream>>>(dots, data, D, N, init, main_op, final_op, inplace);
} else {
coalescedReductionThinKernel<Policy><<<blocks, threads, 0, stream>>>(
dots, data, D, N, init, main_op, reduce_op, final_op, inplace);
}
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

Expand Down Expand Up @@ -240,6 +350,44 @@ RAFT_KERNEL __launch_bounds__(TPB) coalescedReductionMediumKernel(OutType* dots,
}
}

template <int TPB,
typename InType,
typename OutType,
typename IdxType,
typename MainLambda,
typename FinalLambda>
RAFT_KERNEL __launch_bounds__(TPB) coalescedSumMediumKernel(OutType* dots,
const InType* data,
IdxType D,
IdxType N,
OutType init,
MainLambda main_op,
FinalLambda final_op,
bool inplace = false)
{
typedef cub::BlockReduce<OutType, TPB, cub::BLOCK_REDUCE_RAKING> BlockReduce;
__shared__ typename BlockReduce::TempStorage temp_storage1;
__shared__ typename BlockReduce::TempStorage temp_storage2;
OutType thread_data = init;
OutType thread_c = (OutType)0;

IdxType rowStart = blockIdx.x * D;
for (IdxType i = threadIdx.x; i < D; i += TPB) {
IdxType idx = rowStart + i;
KahanBabushkaNeumaierSum<OutType>(thread_data, thread_c, main_op(data[idx], i));
}
OutType block_acc = BlockReduce(temp_storage1).Sum(thread_data);
OutType block_c = BlockReduce(temp_storage2).Sum(thread_c);

if (threadIdx.x == 0) {
if (inplace) {
dots[blockIdx.x] = final_op(dots[blockIdx.x] + block_acc + block_c);
} else {
dots[blockIdx.x] = final_op(block_acc + block_c);
}
}
}

template <int TPB,
typename InType,
typename OutType = InType,
Expand All @@ -259,8 +407,13 @@ void coalescedReductionMedium(OutType* dots,
FinalLambda final_op = raft::identity_op())
{
common::nvtx::range<common::nvtx::domain::raft> fun_scope("coalescedReductionMedium<%d>", TPB);
coalescedReductionMediumKernel<TPB>
<<<N, TPB, 0, stream>>>(dots, data, D, N, init, main_op, reduce_op, final_op, inplace);
if constexpr (std::is_same_v<ReduceLambda, raft::add_op>) {
coalescedSumMediumKernel<TPB>
<<<N, TPB, 0, stream>>>(dots, data, D, N, init, main_op, final_op, inplace);
} else {
coalescedReductionMediumKernel<TPB>
<<<N, TPB, 0, stream>>>(dots, data, D, N, init, main_op, reduce_op, final_op, inplace);
}
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

Expand Down Expand Up @@ -322,6 +475,32 @@ RAFT_KERNEL __launch_bounds__(Policy::ThreadsPerBlock)
if (threadIdx.x == 0) { buffer[Policy::BlocksPerRow * blockIdx.x + blockIdx.y] = acc; }
}

template <typename Policy, typename InType, typename OutType, typename IdxType, typename MainLambda>
RAFT_KERNEL __launch_bounds__(Policy::ThreadsPerBlock) coalescedSumThickKernel(
OutType* buffer, const InType* data, IdxType D, IdxType N, OutType init, MainLambda main_op)
{
typedef cub::BlockReduce<OutType, Policy::ThreadsPerBlock, cub::BLOCK_REDUCE_RAKING> BlockReduce;
__shared__ typename BlockReduce::TempStorage temp_storage1;
__shared__ typename BlockReduce::TempStorage temp_storage2;

OutType thread_data = init;
OutType thread_c = (OutType)0;

IdxType rowStart = blockIdx.x * D;
for (IdxType i = blockIdx.y * Policy::ThreadsPerBlock + threadIdx.x; i < D;
i += Policy::BlockStride) {
IdxType idx = rowStart + i;
KahanBabushkaNeumaierSum<OutType>(thread_data, thread_c, main_op(data[idx], i));
}

OutType block_acc = BlockReduce(temp_storage1).Sum(thread_data);
OutType block_c = BlockReduce(temp_storage2).Sum(thread_c);

if (threadIdx.x == 0) {
buffer[Policy::BlocksPerRow * blockIdx.x + blockIdx.y] = block_acc + block_c;
}
}

template <typename ThickPolicy,
typename ThinPolicy,
typename InType,
Expand Down Expand Up @@ -355,9 +534,13 @@ void coalescedReductionThick(OutType* dots,
* 2. coalescedReductionThinKernel reduces [N x BlocksPerRow] to [N x 1]. It doesn't apply any
* main_op but applies final_op. If in-place, the existing and new values are reduced.
*/

coalescedReductionThickKernel<ThickPolicy>
<<<blocks, threads, 0, stream>>>(buffer.data(), data, D, N, init, main_op, reduce_op);
if constexpr (std::is_same_v<ReduceLambda, raft::add_op>) {
coalescedSumThickKernel<ThickPolicy>
<<<blocks, threads, 0, stream>>>(buffer.data(), data, D, N, init, main_op);
} else {
coalescedReductionThickKernel<ThickPolicy>
<<<blocks, threads, 0, stream>>>(buffer.data(), data, D, N, init, main_op, reduce_op);
}
RAFT_CUDA_TRY(cudaPeekAtLastError());

coalescedReductionThin<ThinPolicy>(dots,
Expand Down Expand Up @@ -391,18 +574,16 @@ void coalescedReductionThickDispatcher(OutType* dots,
{
// Note: multiple elements per thread to take advantage of the sequential reduction and loop
// unrolling
if (D < IdxType(32768)) {
coalescedReductionThick<ReductionThickPolicy<256, 32>, ReductionThinPolicy<32, 128, 1>>(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
} else {
coalescedReductionThick<ReductionThickPolicy<256, 64>, ReductionThinPolicy<32, 128, 1>>(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
}
coalescedReductionThick<ReductionThickPolicy<256, 64>, ReductionThinPolicy<32, 128, 1>>(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
}

// Primitive to perform reductions along the coalesced dimension of the matrix, i.e. reduce along
// rows for row major or reduce along columns for column major layout. Can do an inplace reduction
// adding to original values of dots if requested.
// In case of an add-reduction, a compensated summation will be performed in order to reduce
// numerical error. Note that the compensation will only be performed 'per-thread' for performance
// reasons and therefore not be equivalent to a sequential compensation.
template <typename InType,
typename OutType = InType,
typename IdxType = int,
Expand All @@ -422,14 +603,17 @@ void coalescedReduction(OutType* dots,
{
/* The primitive selects one of three implementations based on heuristics:
* - Thin: very efficient when D is small and/or N is large
* At most one warp is processing each row
* - Thick: used when N is very small and D very large
* - Medium: used when N is too small to fill the GPU with the thin kernel
* Multiple blocks (32/64) processing each row
* - Medium: everything in between
* One block is processing each row
*/
const IdxType numSMs = raft::getMultiProcessorCount();
if (D <= IdxType(256) || N >= IdxType(4) * numSMs) {
if (D <= IdxType(512) || (N >= IdxType(16) * numSMs && D < IdxType(2048))) {
coalescedReductionThinDispatcher(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
} else if (N < numSMs && D >= IdxType(16384)) {
} else if (N < numSMs && D >= IdxType(1 << 17)) {
coalescedReductionThickDispatcher(
dots, data, D, N, init, stream, inplace, main_op, reduce_op, final_op);
} else {
Expand Down
Loading

0 comments on commit 759095a

Please sign in to comment.