diff --git a/cpp/include/raft/device_atomics.cuh b/cpp/include/raft/device_atomics.cuh index d1ca239170..28f7516688 100644 --- a/cpp/include/raft/device_atomics.cuh +++ b/cpp/include/raft/device_atomics.cuh @@ -27,6 +27,7 @@ * binary operator. */ +#include #include namespace raft { @@ -636,3 +637,32 @@ __forceinline__ __device__ T atomicXor(T* address, T val) { return raft::genericAtomicOperation(address, val, raft::device_atomics::detail::DeviceXor{}); } + +/** + * @brief: Warp aggregated atomic increment + * + * increments an atomic counter using all active threads in a warp. The return + * value is the original value of the counter plus the rank of the calling + * thread. + * + * The use of atomicIncWarp is a performance optimization. It can reduce the + * amount of atomic memory traffic by a factor of 32. + * + * Adapted from: + * https://developer.nvidia.com/blog/cuda-pro-tip-optimized-filtering-warp-aggregated-atomics/ + * + * @tparam T An integral type + * @param[in,out] ctr The address of old value + * + * @return The old value of the counter plus the rank of the calling thread. + */ +template ::value, T>* = nullptr> +__device__ T atomicIncWarp(T* ctr) +{ + namespace cg = cooperative_groups; + auto g = cg::coalesced_threads(); + T warp_res; + if (g.thread_rank() == 0) { warp_res = atomicAdd(ctr, static_cast(g.size())); } + return g.shfl(warp_res, 0) + g.thread_rank(); +} diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 43c6257966..3eead4d494 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -20,6 +20,7 @@ add_executable(test_raft test/common/seive.cu test/cudart_utils.cpp test/cluster_solvers.cu + test/device_atomics.cu test/distance/dist_adj.cu test/distance/dist_canberra.cu test/distance/dist_chebyshev.cu diff --git a/cpp/test/device_atomics.cu b/cpp/test/device_atomics.cu new file mode 100644 index 0000000000..8ecedbe7af --- /dev/null +++ b/cpp/test/device_atomics.cu @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace raft { + +__global__ void test_atomic_inc_warp_kernel(int* counter, int* out_array) +{ + int global_tid = blockDim.x * blockIdx.x + threadIdx.x; + out_array[atomicIncWarp(counter)] = global_tid; +} + +TEST(Raft, AtomicIncWarp) +{ + const int num_blocks = 1024; + const int threads_per_block = 1024; + const int num_elts = num_blocks * threads_per_block; + + rmm::cuda_stream_pool pool{1}; + auto s = pool.get_stream(); + + rmm::device_scalar counter{0, s}; + rmm::device_uvector out_device{num_elts, s}; + std::array out_host{0}; + + // Write all 1M thread indices to a unique location in `out_device` + test_atomic_inc_warp_kernel<<>>(counter.data(), + out_device.data()); + + // Copy data to host + RAFT_CUDA_TRY(cudaMemcpy(out_host.data(), + (const void*)out_device.data(), + num_elts * sizeof(int), + cudaMemcpyDeviceToHost)); + + // Check that count is correct and that each thread index is contained in the + // array exactly once. + ASSERT_EQ(num_elts, counter.value(s)); + std::sort(out_host.begin(), out_host.end()); + for (int i = 0; i < num_elts; ++i) { + ASSERT_EQ(i, out_host[i]); + } +} + +} // namespace raft