From dbd348e101b78cfc780801269393305d708da77f Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 27 Apr 2023 05:35:09 -0700 Subject: [PATCH 01/20] add reduction op type, ranked reduction and weighted reduction --- cpp/include/raft/util/cuda_utils.cuh | 143 ++++++++++++++++++++++++-- cpp/test/util/reduction.cu | 148 +++++++++++++++++++++++++++ 2 files changed, 283 insertions(+), 8 deletions(-) create mode 100644 cpp/test/util/reduction.cu diff --git a/cpp/include/raft/util/cuda_utils.cuh b/cpp/include/raft/util/cuda_utils.cuh index 687a6b4651..56ed8d2f05 100644 --- a/cpp/include/raft/util/cuda_utils.cuh +++ b/cpp/include/raft/util/cuda_utils.cuh @@ -854,7 +854,7 @@ DI T warpReduce(T val, ReduceLambda reduce_op) } /** - * @brief Warp-level sum reduction + * @brief Warp-level reduction * @tparam T Value type to be reduced * @param val input value * @return Reduction result. All lanes will have the valid result. @@ -869,28 +869,155 @@ DI T warpReduce(T val) } /** - * @brief 1-D block-level sum reduction + * @brief 1-D block-level reduction * @param val input value * @param smem shared memory region needed for storing intermediate results. It * must alteast be of size: `sizeof(T) * nWarps` + * @param reduce_op a binary reduction operation. * @return only the thread0 will contain valid reduced result * @note Why not cub? Because cub doesn't seem to allow working with arbitrary * number of warps in a block. All threads in the block must enter this - * function together - * @todo Expand this to support arbitrary reduction ops + * function together. cub also uses too many registers */ -template -DI T blockReduce(T val, char* smem) +template +DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) { auto* sTemp = reinterpret_cast(smem); int nWarps = (blockDim.x + WarpSize - 1) / WarpSize; int lid = laneId(); int wid = threadIdx.x / WarpSize; - val = warpReduce(val); + val = warpReduce(val, reduce_op); if (lid == 0) sTemp[wid] = val; __syncthreads(); val = lid < nWarps ? sTemp[lid] : T(0); - return warpReduce(val); + return warpReduce(val, reduce_op); +} + +/** + * @brief 1-D warp-level weighted random reduction which returns the rank. + * used conditional probability to return the weighted random rank of the result. + * @param rng random number generator, must have next_u32() function + * @param weight weight of the rank/index. + * @param idx index to be used as rank + * @return only the thread0 will contain valid reduced result + */ +template +__device__ inline T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) +{ +#pragma unroll + for (i_t offset = raft::WarpSize / 2; offset > 0; offset /= 2) { + T tmp_weight = shfl(weight, laneId() + offset); + i_t tmp_idx = shfl(idx, laneId() + offset); + T sum = (tmp_weight + weight); + weight = sum; + if (sum != 0) { + i_t rnd_number = (rng.next_u32() % sum); + if (rnd_number < tmp_weight) { idx = tmp_idx; } + } + } +} + + +/** + * @brief 1-D block-level weighted random reduction which returns the rank. + * used conditional probability to return the weighted random rank of the result. + * @param rng random number generator, must have next_u32() function + * @param shbuf shared memory region needed for storing intermediate results. It + * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` + * @param weight weight of the rank/index. + * @param idx index to be used as rank + * @return only the thread0 will contain valid reduced result + */ + template + __inline__ __device__ i_t blockRandomReduce( rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) + { + T* values = shbuf; + i_t* indices = (i_t*)&shbuf[WarpSize]; + i_t wid = threadIdx.x / WarpSize; + i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; + warpRandomReduce(rng,weight, idx); // Each warp performs partial reduction + i_t lane = laneId(); + if (lane == 0) { + values[wid] = weight; // Write reduced value to shared memory + indices[wid] = idx; // Write reduced value to shared memory + } + + __syncthreads(); // Wait for all partial reductions + + // read from shared memory only if that warp existed + if (lane < nWarps) { + weight = values[lane]; + idx = indices[lane]; + } else { + // get the min if it is a max op, get the max if it is a min op + weight = 0; + idx = -1; + } + __syncthreads(); + if (wid == 0) warpRandomReduce(rng, weight, idx); + return idx; + } + +/** + * @brief 1-D warp-level ranked reduction which returns the value and rank. + * thread 0 will have valid result and rank(idx). + * @param val input value + * @param idx index to be used as rank + * @param reduce_op a binary reduction operation. + * @return only the thread0 will contain valid reduced result + */ +template +__inline__ __device__ void warpRankedReduce(T& val, i_t& idx, ReduceLambda reduce_op = raft::min_op{}) +{ +#pragma unroll + for (i_t offset = WarpSize / 2; offset > 0; offset /= 2) { + T tmpVal = shfl(val, laneId() + offset); + i_t tmpIdx = shfl(idx, laneId() + offset); + if (reduce_op(tmpVal , val) == tmpVal) { + val = tmpVal; + idx = tmpIdx; + } + } +} + +/** + * @brief 1-D block-level ranked reduction which returns the value and rank. + * thread 0 will have valid result and rank(idx). + * @param val input value + * @param smem shared memory region needed for storing intermediate results. It + * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` + * @param idx index to be used as rank + * @param reduce_op a binary reduction operation. + * @return only the thread0 will contain valid reduced result + */ +template +__inline__ __device__ std::pair blockRankedReduce(T val, T* shbuf, i_t idx = threadIdx.x, ReduceLambda reduce_op = raft::min_op{}) +{ + T* values = shbuf; + i_t* indices = (i_t*)&shbuf[WarpSize]; + i_t wid = threadIdx.x / WarpSize; + i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; + warpRankedReduce(val, idx, reduce_op); // Each warp performs partial reduction + i_t lane = laneId(); + if (lane == 0) { + values[wid] = val; // Write reduced value to shared memory + indices[wid] = idx; // Write reduced value to shared memory + } + + __syncthreads(); // Wait for all partial reductions + + // read from shared memory only if that warp existed + if (lane < nWarps) { + val = values[lane]; + idx = indices[lane]; + } else { + // get the min if it is a max op, get the max if it is a min op + val = reduce_op(std::numeric_limits::min(), std::numeric_limits::max()) == std::numeric_limits::min() ? std::numeric_limits::max() : std::numeric_limits::min(); + idx = -1; + } + __syncthreads(); + if (wid == 0) warpRankedReduce(val, idx, reduce_op); + return std::pair{val,idx}; } /** diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu new file mode 100644 index 0000000000..9ad6353994 --- /dev/null +++ b/cpp/test/util/reduction.cu @@ -0,0 +1,148 @@ +/* +* Copyright (c) 2023, NVIDIA CORPORATION. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "../test_utils.cuh" + +#include +#include + +#include +#include + +#include + +#include +#include + +namespace raft::util { + +template +__global__ void test_reduction_kernel(const int* input, int* reduction_res, ReduceLambda reduce_op) +{ + assert(gridDim.x == 1); + __shared__ int red_buf[raft::WarpSize]; + int th_val = input[threadIdx.x]; + th_val = raft::blockReduce(th_val, (char*)red_buf, reduce_op); + if(threadIdx.x == 0){ + reduction_res[0] = th_val; + } +} + +template +__global__ void test_ranked_reduction_kernel(const int* input, int* reduction_res, int* out_rank, ReduceLambda reduce_op) +{ + assert(gridDim.x == 1); + __shared__ int red_buf[2 * raft::WarpSize]; + int th_val = input[threadIdx.x]; + int th_rank = threadIdx.x; + auto result = raft::blockRankedReduce(th_val, red_buf, th_rank, reduce_op); + if(threadIdx.x == 0){ + reduction_res[0] = result.first; + out_rank[0] = result.second; + } +} + +__global__ void test_random_reduction_kernel(const int* input, int* reduction_res) +{ + assert(gridDim.x == 1); + __shared__ int red_buf[2 * raft::WarpSize]; + raft::random::PCGenerator thread_rng(1234, threadIdx.x, 0); + int th_val = input[threadIdx.x]; + int th_rank = threadIdx.x; + int result = raft::blockRandomReduce(thread_rng,red_buf, th_val, th_rank); + if(threadIdx.x == 0){ + reduction_res[0] = result; + } +} + + +struct reduction_launch { + template + static void run(const rmm::device_uvector& arr_d, int ref_val, ReduceLambda reduce_op, rmm::cuda_stream_view stream) + { + rmm::device_scalar ref_d(stream); + const int block_dim = 64; + const int grid_dim = 1; + test_reduction_kernel + <<>>(arr_d.data(), ref_d.data(),reduce_op); + stream.synchronize(); + RAFT_CUDA_TRY(cudaPeekAtLastError()); + ASSERT_EQ(ref_d.value(stream), ref_val); + } + + template + static void run_ranked(const rmm::device_uvector& arr_d, int ref_val, int rank_ref_val, ReduceLambda reduce_op, rmm::cuda_stream_view stream) + { + rmm::device_scalar ref_d(stream); + rmm::device_scalar rank_d(stream); + const int block_dim = 64; + const int grid_dim = 1; + test_ranked_reduction_kernel + <<>>(arr_d.data(), ref_d.data(),rank_d.data(), reduce_op); + stream.synchronize(); + RAFT_CUDA_TRY(cudaPeekAtLastError()); + ASSERT_EQ(ref_d.value(stream), ref_val); + ASSERT_EQ(rank_d.value(stream), rank_ref_val); + } + + static void run_random(const rmm::device_uvector& arr_d, int ref_val, rmm::cuda_stream_view stream) + { + rmm::device_scalar ref_d(stream); + const int block_dim = 64; + const int grid_dim = 1; + test_random_reduction_kernel + <<>>(arr_d.data(), ref_d.data()); + stream.synchronize(); + RAFT_CUDA_TRY(cudaPeekAtLastError()); + ASSERT_EQ(ref_d.value(stream), ref_val); + } + +}; + +template +class ReductionTest : public testing::TestWithParam> { // NOLINT + protected: + const std::vector input; // NOLINT + + public: + explicit ReductionTest() + : input(testing::TestWithParam>::GetParam()) + { + auto stream = rmm::cuda_stream_default; + rmm::device_uvector arr_d(input.size(), stream); + + update_device(arr_d.data(), input.data(), input.size(), stream); + // calculate the results + reduction_launch::run(arr_d, 0, raft::min_op{}, stream); + reduction_launch::run(arr_d, 5, raft::max_op{}, stream); + reduction_launch::run(arr_d, 158, raft::add_op{}, stream); + reduction_launch::run_ranked(arr_d, 5, 15, raft::max_op{}, stream); + reduction_launch::run_ranked(arr_d, 0, 26, raft::min_op{}, stream); + // value 15 is for the current state of PCgenerator. adjust this if rng changes + reduction_launch::run_random(arr_d, 15,stream); + } + +}; + +const std::vector test_vector{1, 2, 3,4, 1, 2, 3,4, 1, 2, 3,4,1, 2, 3,5, 1, 2, 3,4, 1, 2, 3,4,1, 2, 0,4, 1, 2, 3,4, 1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,}; +auto reduction_input = ::testing::Values(test_vector); + + +using ReductionTestInt = ReductionTest; // NOLINT +TEST_P(ReductionTestInt, ALL) {} // NOLINT +INSTANTIATE_TEST_CASE_P(ReductionTest,ReductionTestInt, reduction_input); // NOLINT + +} // namespace raft::util From e911e6696f25910b89c22f1dbf477249ea5387a8 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 27 Apr 2023 08:26:58 -0700 Subject: [PATCH 02/20] add binary reduction and pow2 --- cpp/include/raft/util/cuda_utils.cuh | 28 ++++++++++ cpp/include/raft/util/device_loads_stores.cuh | 56 +++++++++++++++++++ cpp/include/raft/util/pow2_utils.cuh | 15 +++++ cpp/test/util/reduction.cu | 54 +++++++++++++++--- 4 files changed, 144 insertions(+), 9 deletions(-) diff --git a/cpp/include/raft/util/cuda_utils.cuh b/cpp/include/raft/util/cuda_utils.cuh index 56ed8d2f05..10c97a2b06 100644 --- a/cpp/include/raft/util/cuda_utils.cuh +++ b/cpp/include/raft/util/cuda_utils.cuh @@ -1020,6 +1020,34 @@ __inline__ __device__ std::pair blockRankedReduce(T val, T* shbuf, i_t i return std::pair{val,idx}; } +/** + * @brief Executes a 1d binary block reduce + * @param val binary value to be reduced across the thread block + * @param reduce_op a binary reduction operation. + * @return only the thread0 will contain valid reduced result + */ +template +DI i_t binaryBlockReduce(i_t val, i_t* shared) +{ + static_assert(BLOCK_SIZE <= 1024); + assert(val == 0 || val == 1); + const uint32_t mask = __ballot_sync(~0, val); + const uint32_t n_items = __popc(mask); + + // Each first thread of the warp + if (threadIdx.x % WarpSize == 0) + shared[threadIdx.x / WarpSize] = n_items; + __syncthreads(); + + val = (threadIdx.x < BLOCK_SIZE / WarpSize) ? shared[threadIdx.x] : 0; + + if (threadIdx.x < WarpSize) + return warpReduce(val); + else // Only first warp gets the results + return -1; +} + + /** * @brief Simple utility function to determine whether user_stream or one of the * internal streams should be used. diff --git a/cpp/include/raft/util/device_loads_stores.cuh b/cpp/include/raft/util/device_loads_stores.cuh index c9bda26b81..fd7f937630 100644 --- a/cpp/include/raft/util/device_loads_stores.cuh +++ b/cpp/include/raft/util/device_loads_stores.cuh @@ -534,6 +534,62 @@ DI void ldg(int8_t (&x)[1], const int8_t* const& addr) x[0] = x_int; } +/** + * @brief Executes a 1D block strided copy + * @param dst destination pointer + * @param src source pointer + * @param size number of items to copy + */ + template + DI void block_copy(T* dst, const T* src, const size_t size) + { + for (auto i = threadIdx.x; i < size; i += blockDim.x) { + dst[i] = src[i]; + } + } + + /** + * @brief Executes a 1D block strided copy + * @param dst span of destination pointer + * @param src span of source pointer + * @param size number of items to copy + */ + template + DI void block_copy(raft::device_span dst, + const raft::device_span src, + const size_t size) + { + assert(src.size() >= size); + assert(dst.size() >= size); + block_copy(dst.data(), src.data(), size); + } + + /** + * @brief Executes a 1D block strided copy + * @param dst span of destination pointer + * @param src span of source pointer + * @param size number of items to copy + */ + template + DI void block_copy(raft::device_span dst, const raft::device_span src, const size_t size) + { + assert(src.size() >= size); + assert(dst.size() >= size); + block_copy(dst.data(), src.data(), size); + } + + /** + * @brief Executes a 1D block strided copy + * @param dst span of destination pointer + * @param src span of source pointer + */ + template + DI void block_copy(raft::device_span dst, const raft::device_span src) + { + assert(dst.size() >= src.size()); + block_copy(dst, src, src.size()); + } + /** @} */ } // namespace raft diff --git a/cpp/include/raft/util/pow2_utils.cuh b/cpp/include/raft/util/pow2_utils.cuh index 3b42682816..3268384586 100644 --- a/cpp/include/raft/util/pow2_utils.cuh +++ b/cpp/include/raft/util/pow2_utils.cuh @@ -20,6 +20,12 @@ namespace raft { +template +constexpr i_t next_pow2(i_t val) +{ + return 1 << (log2(val) + 1); +} + /** * @brief Fast arithmetics and alignment checks for power-of-two values known at compile time. * @@ -81,6 +87,15 @@ struct Pow2 { return x >> I(Log2); } + /** + * Rounds up the value to next power of two. + */ + template + Pow2_FUNC_QUALIFIER Pow2_WHEN_INTEGRAL(I) round_up_pow2(I val) noexcept + { + return 1 << (log2(val) + 1); + } + /** * x modulo Value operation (remainder of the `div(x)`) * (same as `x % Value` in Python). diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu index 9ad6353994..d009f4bd95 100644 --- a/cpp/test/util/reduction.cu +++ b/cpp/test/util/reduction.cu @@ -68,6 +68,18 @@ __global__ void test_random_reduction_kernel(const int* input, int* reduction_re } } +template +__global__ void test_binary_reduction_kernel(const int* input, int* reduction_res) +{ + assert(gridDim.x == 1); + __shared__ int shared[TPB / WarpSize]; + int th_val = input[threadIdx.x]; + int result = raft::binaryBlockReduce(th_val,shared); + if(threadIdx.x == 0){ + reduction_res[0] = result; + } +} + struct reduction_launch { template @@ -110,21 +122,37 @@ struct reduction_launch { ASSERT_EQ(ref_d.value(stream), ref_val); } + static void run_binary(const rmm::device_uvector& arr_d, int ref_val, rmm::cuda_stream_view stream) + { + rmm::device_scalar ref_d(stream); + constexpr int block_dim = 64; + const int grid_dim = 1; + test_binary_reduction_kernel + <<>>(arr_d.data(), ref_d.data()); + stream.synchronize(); + RAFT_CUDA_TRY(cudaPeekAtLastError()); + ASSERT_EQ(ref_d.value(stream), ref_val); + } + }; template class ReductionTest : public testing::TestWithParam> { // NOLINT protected: - const std::vector input; // NOLINT - + const std::vector input; // NOLINT + rmm::cuda_stream_view stream; // NOLINT + rmm::device_uvector arr_d; // NOLINT + public: explicit ReductionTest() - : input(testing::TestWithParam>::GetParam()) - { - auto stream = rmm::cuda_stream_default; - rmm::device_uvector arr_d(input.size(), stream); - + : input(testing::TestWithParam>::GetParam()) , stream(rmm::cuda_stream_default), arr_d(input.size(), stream) + { update_device(arr_d.data(), input.data(), input.size(), stream); + + + } + + void run_reduction(){ // calculate the results reduction_launch::run(arr_d, 0, raft::min_op{}, stream); reduction_launch::run(arr_d, 5, raft::max_op{}, stream); @@ -135,14 +163,22 @@ class ReductionTest : public testing::TestWithParam> { // N reduction_launch::run_random(arr_d, 15,stream); } + void run_binary_reduction(){ + reduction_launch::run_binary(arr_d, 24,stream); + } + }; -const std::vector test_vector{1, 2, 3,4, 1, 2, 3,4, 1, 2, 3,4,1, 2, 3,5, 1, 2, 3,4, 1, 2, 3,4,1, 2, 0,4, 1, 2, 3,4, 1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,}; +const std::vector test_vector{1, 2, 3,4, 1, 2, 3,4, 1, 2, 3,4,1, 2, 3,5, 1, 2, 3,4, 1, 2, 3,4,1, 2, 0,4, 1, 2, 3,4, 1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4}; +const std::vector binary_test_vector{1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0}; auto reduction_input = ::testing::Values(test_vector); +auto binary_reduction_input = ::testing::Values(binary_test_vector); using ReductionTestInt = ReductionTest; // NOLINT -TEST_P(ReductionTestInt, ALL) {} // NOLINT +TEST_P(ReductionTestInt, REDUCTIONS) {run_reduction();} INSTANTIATE_TEST_CASE_P(ReductionTest,ReductionTestInt, reduction_input); // NOLINT +TEST_P(ReductionTestInt, BINARY_REDUCTION) {run_binary_reduction();} // NOLINT +INSTANTIATE_TEST_CASE_P(BinaryReductionTest,ReductionTestInt, binary_reduction_input); // NOLINT } // namespace raft::util From 4daafc1b01b227c7f54f38201224c23aa2b6fe47 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 27 Apr 2023 08:36:38 -0700 Subject: [PATCH 03/20] clang format --- cpp/include/raft/util/cuda_utils.cuh | 86 +++++----- cpp/include/raft/util/device_loads_stores.cuh | 78 ++++----- cpp/include/raft/util/pow2_utils.cuh | 12 +- cpp/test/util/reduction.cu | 158 +++++++++--------- 4 files changed, 173 insertions(+), 161 deletions(-) diff --git a/cpp/include/raft/util/cuda_utils.cuh b/cpp/include/raft/util/cuda_utils.cuh index 10c97a2b06..485f3c4658 100644 --- a/cpp/include/raft/util/cuda_utils.cuh +++ b/cpp/include/raft/util/cuda_utils.cuh @@ -873,7 +873,7 @@ DI T warpReduce(T val) * @param val input value * @param smem shared memory region needed for storing intermediate results. It * must alteast be of size: `sizeof(T) * nWarps` - * @param reduce_op a binary reduction operation. + * @param reduce_op a binary reduction operation. * @return only the thread0 will contain valid reduced result * @note Why not cub? Because cub doesn't seem to allow working with arbitrary * number of warps in a block. All threads in the block must enter this @@ -917,7 +917,6 @@ __device__ inline T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) } } - /** * @brief 1-D block-level weighted random reduction which returns the rank. * used conditional probability to return the weighted random rank of the result. @@ -928,35 +927,36 @@ __device__ inline T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) * @param idx index to be used as rank * @return only the thread0 will contain valid reduced result */ - template - __inline__ __device__ i_t blockRandomReduce( rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) - { - T* values = shbuf; - i_t* indices = (i_t*)&shbuf[WarpSize]; - i_t wid = threadIdx.x / WarpSize; - i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; - warpRandomReduce(rng,weight, idx); // Each warp performs partial reduction - i_t lane = laneId(); - if (lane == 0) { - values[wid] = weight; // Write reduced value to shared memory - indices[wid] = idx; // Write reduced value to shared memory - } - - __syncthreads(); // Wait for all partial reductions - - // read from shared memory only if that warp existed - if (lane < nWarps) { - weight = values[lane]; - idx = indices[lane]; - } else { - // get the min if it is a max op, get the max if it is a min op - weight = 0; - idx = -1; - } - __syncthreads(); - if (wid == 0) warpRandomReduce(rng, weight, idx); - return idx; - } +template +__inline__ __device__ i_t +blockRandomReduce(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) +{ + T* values = shbuf; + i_t* indices = (i_t*)&shbuf[WarpSize]; + i_t wid = threadIdx.x / WarpSize; + i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; + warpRandomReduce(rng, weight, idx); // Each warp performs partial reduction + i_t lane = laneId(); + if (lane == 0) { + values[wid] = weight; // Write reduced value to shared memory + indices[wid] = idx; // Write reduced value to shared memory + } + + __syncthreads(); // Wait for all partial reductions + + // read from shared memory only if that warp existed + if (lane < nWarps) { + weight = values[lane]; + idx = indices[lane]; + } else { + // get the min if it is a max op, get the max if it is a min op + weight = 0; + idx = -1; + } + __syncthreads(); + if (wid == 0) warpRandomReduce(rng, weight, idx); + return idx; +} /** * @brief 1-D warp-level ranked reduction which returns the value and rank. @@ -967,13 +967,15 @@ __device__ inline T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) * @return only the thread0 will contain valid reduced result */ template -__inline__ __device__ void warpRankedReduce(T& val, i_t& idx, ReduceLambda reduce_op = raft::min_op{}) +__inline__ __device__ void warpRankedReduce(T& val, + i_t& idx, + ReduceLambda reduce_op = raft::min_op{}) { #pragma unroll for (i_t offset = WarpSize / 2; offset > 0; offset /= 2) { T tmpVal = shfl(val, laneId() + offset); i_t tmpIdx = shfl(idx, laneId() + offset); - if (reduce_op(tmpVal , val) == tmpVal) { + if (reduce_op(tmpVal, val) == tmpVal) { val = tmpVal; idx = tmpIdx; } @@ -991,7 +993,10 @@ __inline__ __device__ void warpRankedReduce(T& val, i_t& idx, ReduceLambda reduc * @return only the thread0 will contain valid reduced result */ template -__inline__ __device__ std::pair blockRankedReduce(T val, T* shbuf, i_t idx = threadIdx.x, ReduceLambda reduce_op = raft::min_op{}) +__inline__ __device__ std::pair blockRankedReduce(T val, + T* shbuf, + i_t idx = threadIdx.x, + ReduceLambda reduce_op = raft::min_op{}) { T* values = shbuf; i_t* indices = (i_t*)&shbuf[WarpSize]; @@ -1012,12 +1017,15 @@ __inline__ __device__ std::pair blockRankedReduce(T val, T* shbuf, i_t i idx = indices[lane]; } else { // get the min if it is a max op, get the max if it is a min op - val = reduce_op(std::numeric_limits::min(), std::numeric_limits::max()) == std::numeric_limits::min() ? std::numeric_limits::max() : std::numeric_limits::min(); + val = reduce_op(std::numeric_limits::min(), std::numeric_limits::max()) == + std::numeric_limits::min() + ? std::numeric_limits::max() + : std::numeric_limits::min(); idx = -1; } __syncthreads(); if (wid == 0) warpRankedReduce(val, idx, reduce_op); - return std::pair{val,idx}; + return std::pair{val, idx}; } /** @@ -1031,12 +1039,11 @@ DI i_t binaryBlockReduce(i_t val, i_t* shared) { static_assert(BLOCK_SIZE <= 1024); assert(val == 0 || val == 1); - const uint32_t mask = __ballot_sync(~0, val); + const uint32_t mask = __ballot_sync(~0, val); const uint32_t n_items = __popc(mask); // Each first thread of the warp - if (threadIdx.x % WarpSize == 0) - shared[threadIdx.x / WarpSize] = n_items; + if (threadIdx.x % WarpSize == 0) shared[threadIdx.x / WarpSize] = n_items; __syncthreads(); val = (threadIdx.x < BLOCK_SIZE / WarpSize) ? shared[threadIdx.x] : 0; @@ -1047,7 +1054,6 @@ DI i_t binaryBlockReduce(i_t val, i_t* shared) return -1; } - /** * @brief Simple utility function to determine whether user_stream or one of the * internal streams should be used. diff --git a/cpp/include/raft/util/device_loads_stores.cuh b/cpp/include/raft/util/device_loads_stores.cuh index fd7f937630..44559d247d 100644 --- a/cpp/include/raft/util/device_loads_stores.cuh +++ b/cpp/include/raft/util/device_loads_stores.cuh @@ -535,60 +535,60 @@ DI void ldg(int8_t (&x)[1], const int8_t* const& addr) } /** - * @brief Executes a 1D block strided copy + * @brief Executes a 1D block strided copy * @param dst destination pointer * @param src source pointer * @param size number of items to copy */ - template - DI void block_copy(T* dst, const T* src, const size_t size) - { - for (auto i = threadIdx.x; i < size; i += blockDim.x) { - dst[i] = src[i]; - } - } - - /** - * @brief Executes a 1D block strided copy +template +DI void block_copy(T* dst, const T* src, const size_t size) +{ + for (auto i = threadIdx.x; i < size; i += blockDim.x) { + dst[i] = src[i]; + } +} + +/** + * @brief Executes a 1D block strided copy * @param dst span of destination pointer * @param src span of source pointer * @param size number of items to copy */ - template - DI void block_copy(raft::device_span dst, - const raft::device_span src, - const size_t size) - { - assert(src.size() >= size); - assert(dst.size() >= size); - block_copy(dst.data(), src.data(), size); - } - - /** - * @brief Executes a 1D block strided copy +template +DI void block_copy(raft::device_span dst, + const raft::device_span src, + const size_t size) +{ + assert(src.size() >= size); + assert(dst.size() >= size); + block_copy(dst.data(), src.data(), size); +} + +/** + * @brief Executes a 1D block strided copy * @param dst span of destination pointer * @param src span of source pointer * @param size number of items to copy */ - template - DI void block_copy(raft::device_span dst, const raft::device_span src, const size_t size) - { - assert(src.size() >= size); - assert(dst.size() >= size); - block_copy(dst.data(), src.data(), size); - } - - /** - * @brief Executes a 1D block strided copy +template +DI void block_copy(raft::device_span dst, const raft::device_span src, const size_t size) +{ + assert(src.size() >= size); + assert(dst.size() >= size); + block_copy(dst.data(), src.data(), size); +} + +/** + * @brief Executes a 1D block strided copy * @param dst span of destination pointer * @param src span of source pointer */ - template - DI void block_copy(raft::device_span dst, const raft::device_span src) - { - assert(dst.size() >= src.size()); - block_copy(dst, src, src.size()); - } +template +DI void block_copy(raft::device_span dst, const raft::device_span src) +{ + assert(dst.size() >= src.size()); + block_copy(dst, src, src.size()); +} /** @} */ diff --git a/cpp/include/raft/util/pow2_utils.cuh b/cpp/include/raft/util/pow2_utils.cuh index 3268384586..1d42920bf7 100644 --- a/cpp/include/raft/util/pow2_utils.cuh +++ b/cpp/include/raft/util/pow2_utils.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -90,11 +90,11 @@ struct Pow2 { /** * Rounds up the value to next power of two. */ - template - Pow2_FUNC_QUALIFIER Pow2_WHEN_INTEGRAL(I) round_up_pow2(I val) noexcept - { - return 1 << (log2(val) + 1); - } + template + Pow2_FUNC_QUALIFIER Pow2_WHEN_INTEGRAL(I) round_up_pow2(I val) noexcept + { + return 1 << (log2(val) + 1); + } /** * x modulo Value operation (remainder of the `div(x)`) diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu index d009f4bd95..3a7666920e 100644 --- a/cpp/test/util/reduction.cu +++ b/cpp/test/util/reduction.cu @@ -1,18 +1,18 @@ /* -* Copyright (c) 2023, NVIDIA CORPORATION. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #include "../test_utils.cuh" @@ -35,23 +35,24 @@ __global__ void test_reduction_kernel(const int* input, int* reduction_res, Redu assert(gridDim.x == 1); __shared__ int red_buf[raft::WarpSize]; int th_val = input[threadIdx.x]; - th_val = raft::blockReduce(th_val, (char*)red_buf, reduce_op); - if(threadIdx.x == 0){ - reduction_res[0] = th_val; - } + th_val = raft::blockReduce(th_val, (char*)red_buf, reduce_op); + if (threadIdx.x == 0) { reduction_res[0] = th_val; } } template -__global__ void test_ranked_reduction_kernel(const int* input, int* reduction_res, int* out_rank, ReduceLambda reduce_op) +__global__ void test_ranked_reduction_kernel(const int* input, + int* reduction_res, + int* out_rank, + ReduceLambda reduce_op) { assert(gridDim.x == 1); __shared__ int red_buf[2 * raft::WarpSize]; - int th_val = input[threadIdx.x]; + int th_val = input[threadIdx.x]; int th_rank = threadIdx.x; - auto result = raft::blockRankedReduce(th_val, red_buf, th_rank, reduce_op); - if(threadIdx.x == 0){ + auto result = raft::blockRankedReduce(th_val, red_buf, th_rank, reduce_op); + if (threadIdx.x == 0) { reduction_res[0] = result.first; - out_rank[0] = result.second; + out_rank[0] = result.second; } } @@ -60,99 +61,104 @@ __global__ void test_random_reduction_kernel(const int* input, int* reduction_re assert(gridDim.x == 1); __shared__ int red_buf[2 * raft::WarpSize]; raft::random::PCGenerator thread_rng(1234, threadIdx.x, 0); - int th_val = input[threadIdx.x]; + int th_val = input[threadIdx.x]; int th_rank = threadIdx.x; - int result = raft::blockRandomReduce(thread_rng,red_buf, th_val, th_rank); - if(threadIdx.x == 0){ - reduction_res[0] = result; - } + int result = raft::blockRandomReduce(thread_rng, red_buf, th_val, th_rank); + if (threadIdx.x == 0) { reduction_res[0] = result; } } -template +template __global__ void test_binary_reduction_kernel(const int* input, int* reduction_res) { assert(gridDim.x == 1); __shared__ int shared[TPB / WarpSize]; int th_val = input[threadIdx.x]; - int result = raft::binaryBlockReduce(th_val,shared); - if(threadIdx.x == 0){ - reduction_res[0] = result; - } + int result = raft::binaryBlockReduce(th_val, shared); + if (threadIdx.x == 0) { reduction_res[0] = result; } } - struct reduction_launch { template - static void run(const rmm::device_uvector& arr_d, int ref_val, ReduceLambda reduce_op, rmm::cuda_stream_view stream) + static void run(const rmm::device_uvector& arr_d, + int ref_val, + ReduceLambda reduce_op, + rmm::cuda_stream_view stream) { rmm::device_scalar ref_d(stream); - const int block_dim = 64; - const int grid_dim = 1; - test_reduction_kernel - <<>>(arr_d.data(), ref_d.data(),reduce_op); + const int block_dim = 64; + const int grid_dim = 1; + test_reduction_kernel<<>>( + arr_d.data(), ref_d.data(), reduce_op); stream.synchronize(); RAFT_CUDA_TRY(cudaPeekAtLastError()); ASSERT_EQ(ref_d.value(stream), ref_val); } template - static void run_ranked(const rmm::device_uvector& arr_d, int ref_val, int rank_ref_val, ReduceLambda reduce_op, rmm::cuda_stream_view stream) + static void run_ranked(const rmm::device_uvector& arr_d, + int ref_val, + int rank_ref_val, + ReduceLambda reduce_op, + rmm::cuda_stream_view stream) { rmm::device_scalar ref_d(stream); rmm::device_scalar rank_d(stream); - const int block_dim = 64; - const int grid_dim = 1; - test_ranked_reduction_kernel - <<>>(arr_d.data(), ref_d.data(),rank_d.data(), reduce_op); + const int block_dim = 64; + const int grid_dim = 1; + test_ranked_reduction_kernel<<>>( + arr_d.data(), ref_d.data(), rank_d.data(), reduce_op); stream.synchronize(); RAFT_CUDA_TRY(cudaPeekAtLastError()); ASSERT_EQ(ref_d.value(stream), ref_val); ASSERT_EQ(rank_d.value(stream), rank_ref_val); } - static void run_random(const rmm::device_uvector& arr_d, int ref_val, rmm::cuda_stream_view stream) + static void run_random(const rmm::device_uvector& arr_d, + int ref_val, + rmm::cuda_stream_view stream) { rmm::device_scalar ref_d(stream); - const int block_dim = 64; - const int grid_dim = 1; - test_random_reduction_kernel - <<>>(arr_d.data(), ref_d.data()); + const int block_dim = 64; + const int grid_dim = 1; + test_random_reduction_kernel<<>>(arr_d.data(), ref_d.data()); stream.synchronize(); RAFT_CUDA_TRY(cudaPeekAtLastError()); ASSERT_EQ(ref_d.value(stream), ref_val); } - static void run_binary(const rmm::device_uvector& arr_d, int ref_val, rmm::cuda_stream_view stream) + static void run_binary(const rmm::device_uvector& arr_d, + int ref_val, + rmm::cuda_stream_view stream) { rmm::device_scalar ref_d(stream); - constexpr int block_dim = 64; - const int grid_dim = 1; + constexpr int block_dim = 64; + const int grid_dim = 1; test_binary_reduction_kernel <<>>(arr_d.data(), ref_d.data()); stream.synchronize(); RAFT_CUDA_TRY(cudaPeekAtLastError()); ASSERT_EQ(ref_d.value(stream), ref_val); } - }; template -class ReductionTest : public testing::TestWithParam> { // NOLINT +class ReductionTest : public testing::TestWithParam> { // NOLINT protected: - const std::vector input; // NOLINT + const std::vector input; // NOLINT rmm::cuda_stream_view stream; // NOLINT - rmm::device_uvector arr_d; // NOLINT - + rmm::device_uvector arr_d; // NOLINT + public: explicit ReductionTest() - : input(testing::TestWithParam>::GetParam()) , stream(rmm::cuda_stream_default), arr_d(input.size(), stream) - { + : input(testing::TestWithParam>::GetParam()), + stream(rmm::cuda_stream_default), + arr_d(input.size(), stream) + { update_device(arr_d.data(), input.data(), input.size(), stream); - - } - void run_reduction(){ + void run_reduction() + { // calculate the results reduction_launch::run(arr_d, 0, raft::min_op{}, stream); reduction_launch::run(arr_d, 5, raft::max_op{}, stream); @@ -160,25 +166,25 @@ class ReductionTest : public testing::TestWithParam> { // N reduction_launch::run_ranked(arr_d, 5, 15, raft::max_op{}, stream); reduction_launch::run_ranked(arr_d, 0, 26, raft::min_op{}, stream); // value 15 is for the current state of PCgenerator. adjust this if rng changes - reduction_launch::run_random(arr_d, 15,stream); - } - - void run_binary_reduction(){ - reduction_launch::run_binary(arr_d, 24,stream); + reduction_launch::run_random(arr_d, 15, stream); } + void run_binary_reduction() { reduction_launch::run_binary(arr_d, 24, stream); } }; -const std::vector test_vector{1, 2, 3,4, 1, 2, 3,4, 1, 2, 3,4,1, 2, 3,5, 1, 2, 3,4, 1, 2, 3,4,1, 2, 0,4, 1, 2, 3,4, 1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4,1, 2, 3,4, 1, 2, 3,4}; -const std::vector binary_test_vector{1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0,1,0,0,0,1,1,0,0}; -auto reduction_input = ::testing::Values(test_vector); +const std::vector test_vector{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 5, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 0, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, + 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}; +const std::vector binary_test_vector{ + 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, + 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0}; +auto reduction_input = ::testing::Values(test_vector); auto binary_reduction_input = ::testing::Values(binary_test_vector); - -using ReductionTestInt = ReductionTest; // NOLINT -TEST_P(ReductionTestInt, REDUCTIONS) {run_reduction();} -INSTANTIATE_TEST_CASE_P(ReductionTest,ReductionTestInt, reduction_input); // NOLINT -TEST_P(ReductionTestInt, BINARY_REDUCTION) {run_binary_reduction();} // NOLINT -INSTANTIATE_TEST_CASE_P(BinaryReductionTest,ReductionTestInt, binary_reduction_input); // NOLINT +using ReductionTestInt = ReductionTest; // NOLINT +TEST_P(ReductionTestInt, REDUCTIONS) { run_reduction(); } +INSTANTIATE_TEST_CASE_P(ReductionTest, ReductionTestInt, reduction_input); // NOLINT +TEST_P(ReductionTestInt, BINARY_REDUCTION) { run_binary_reduction(); } // NOLINT +INSTANTIATE_TEST_CASE_P(BinaryReductionTest, ReductionTestInt, binary_reduction_input); // NOLINT } // namespace raft::util From fef36e3d199a821a2223063414aaf446f2f375ec Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 27 Apr 2023 08:59:22 -0700 Subject: [PATCH 04/20] add tests for binary reduction and separate warp primitives --- cpp/include/raft/util/cuda_utils.cuh | 471 +--------------------- cpp/include/raft/util/reduction.cuh | 266 ++++++++++++ cpp/include/raft/util/warp_primitives.cuh | 259 ++++++++++++ cpp/test/CMakeLists.txt | 22 +- cpp/test/util/reduction.cu | 13 +- 5 files changed, 546 insertions(+), 485 deletions(-) create mode 100644 cpp/include/raft/util/reduction.cuh create mode 100644 cpp/include/raft/util/warp_primitives.cuh diff --git a/cpp/include/raft/util/cuda_utils.cuh b/cpp/include/raft/util/cuda_utils.cuh index 485f3c4658..e211d41e2f 100644 --- a/cpp/include/raft/util/cuda_utils.cuh +++ b/cpp/include/raft/util/cuda_utils.cuh @@ -24,6 +24,7 @@ #include #include #include +#include namespace raft { @@ -523,238 +524,6 @@ DI double maxPrim(double x, double y) } /** @} */ -/** apply a warp-wide fence (useful from Volta+ archs) */ -DI void warpFence() -{ -#if __CUDA_ARCH__ >= 700 - __syncwarp(); -#endif -} - -/** warp-wide any boolean aggregator */ -DI bool any(bool inFlag, uint32_t mask = 0xffffffffu) -{ -#if CUDART_VERSION >= 9000 - inFlag = __any_sync(mask, inFlag); -#else - inFlag = __any(inFlag); -#endif - return inFlag; -} - -/** warp-wide all boolean aggregator */ -DI bool all(bool inFlag, uint32_t mask = 0xffffffffu) -{ -#if CUDART_VERSION >= 9000 - inFlag = __all_sync(mask, inFlag); -#else - inFlag = __all(inFlag); -#endif - return inFlag; -} - -/** For every thread in the warp, set the corresponding bit to the thread's flag value. */ -DI uint32_t ballot(bool inFlag, uint32_t mask = 0xffffffffu) -{ -#if CUDART_VERSION >= 9000 - return __ballot_sync(mask, inFlag); -#else - return __ballot(inFlag); -#endif -} - -/** True CUDA alignment of a type (adapted from CUB) */ -template -struct cuda_alignment { - struct Pad { - T val; - char byte; - }; - - static constexpr int bytes = sizeof(Pad) - sizeof(T); -}; - -template -struct is_multiple { - static constexpr int large_align_bytes = cuda_alignment::bytes; - static constexpr int unit_align_bytes = cuda_alignment::bytes; - static constexpr bool value = - (sizeof(LargeT) % sizeof(UnitT) == 0) && (large_align_bytes % unit_align_bytes == 0); -}; - -template -inline constexpr bool is_multiple_v = is_multiple::value; - -template -struct is_shuffleable { - static constexpr bool value = - std::is_same_v || std::is_same_v || std::is_same_v || - std::is_same_v || std::is_same_v || - std::is_same_v || std::is_same_v || std::is_same_v; -}; - -template -inline constexpr bool is_shuffleable_v = is_shuffleable::value; - -/** - * @brief Shuffle the data inside a warp - * @tparam T the data type - * @param val value to be shuffled - * @param srcLane lane from where to shuffle - * @param width lane width - * @param mask mask of participating threads (Volta+) - * @return the shuffled data - */ -template -DI std::enable_if_t, T> shfl(T val, - int srcLane, - int width = WarpSize, - uint32_t mask = 0xffffffffu) -{ -#if CUDART_VERSION >= 9000 - return __shfl_sync(mask, val, srcLane, width); -#else - return __shfl(val, srcLane, width); -#endif -} - -/// Overload of shfl for data types not supported by the CUDA intrinsics -template -DI std::enable_if_t, T> shfl(T val, - int srcLane, - int width = WarpSize, - uint32_t mask = 0xffffffffu) -{ - using UnitT = - std::conditional_t, - unsigned int, - std::conditional_t, unsigned short, unsigned char>>; - - constexpr int n_words = sizeof(T) / sizeof(UnitT); - - T output; - UnitT* output_alias = reinterpret_cast(&output); - UnitT* input_alias = reinterpret_cast(&val); - - unsigned int shuffle_word; - shuffle_word = shfl((unsigned int)input_alias[0], srcLane, width, mask); - output_alias[0] = shuffle_word; - -#pragma unroll - for (int i = 1; i < n_words; ++i) { - shuffle_word = shfl((unsigned int)input_alias[i], srcLane, width, mask); - output_alias[i] = shuffle_word; - } - - return output; -} - -/** - * @brief Shuffle the data inside a warp from lower lane IDs - * @tparam T the data type - * @param val value to be shuffled - * @param delta lower lane ID delta from where to shuffle - * @param width lane width - * @param mask mask of participating threads (Volta+) - * @return the shuffled data - */ -template -DI std::enable_if_t, T> shfl_up(T val, - int delta, - int width = WarpSize, - uint32_t mask = 0xffffffffu) -{ -#if CUDART_VERSION >= 9000 - return __shfl_up_sync(mask, val, delta, width); -#else - return __shfl_up(val, delta, width); -#endif -} - -/// Overload of shfl_up for data types not supported by the CUDA intrinsics -template -DI std::enable_if_t, T> shfl_up(T val, - int delta, - int width = WarpSize, - uint32_t mask = 0xffffffffu) -{ - using UnitT = - std::conditional_t, - unsigned int, - std::conditional_t, unsigned short, unsigned char>>; - - constexpr int n_words = sizeof(T) / sizeof(UnitT); - - T output; - UnitT* output_alias = reinterpret_cast(&output); - UnitT* input_alias = reinterpret_cast(&val); - - unsigned int shuffle_word; - shuffle_word = shfl_up((unsigned int)input_alias[0], delta, width, mask); - output_alias[0] = shuffle_word; - -#pragma unroll - for (int i = 1; i < n_words; ++i) { - shuffle_word = shfl_up((unsigned int)input_alias[i], delta, width, mask); - output_alias[i] = shuffle_word; - } - - return output; -} - -/** - * @brief Shuffle the data inside a warp - * @tparam T the data type - * @param val value to be shuffled - * @param laneMask mask to be applied in order to perform xor shuffle - * @param width lane width - * @param mask mask of participating threads (Volta+) - * @return the shuffled data - */ -template -DI std::enable_if_t, T> shfl_xor(T val, - int laneMask, - int width = WarpSize, - uint32_t mask = 0xffffffffu) -{ -#if CUDART_VERSION >= 9000 - return __shfl_xor_sync(mask, val, laneMask, width); -#else - return __shfl_xor(val, laneMask, width); -#endif -} - -/// Overload of shfl_xor for data types not supported by the CUDA intrinsics -template -DI std::enable_if_t, T> shfl_xor(T val, - int laneMask, - int width = WarpSize, - uint32_t mask = 0xffffffffu) -{ - using UnitT = - std::conditional_t, - unsigned int, - std::conditional_t, unsigned short, unsigned char>>; - - constexpr int n_words = sizeof(T) / sizeof(UnitT); - - T output; - UnitT* output_alias = reinterpret_cast(&output); - UnitT* input_alias = reinterpret_cast(&val); - - unsigned int shuffle_word; - shuffle_word = shfl_xor((unsigned int)input_alias[0], laneMask, width, mask); - output_alias[0] = shuffle_word; - -#pragma unroll - for (int i = 1; i < n_words; ++i) { - shuffle_word = shfl_xor((unsigned int)input_alias[i], laneMask, width, mask); - output_alias[i] = shuffle_word; - } - - return output; -} - /** * @brief Four-way byte dot product-accumulate. * @tparam T Four-byte integer: int or unsigned int @@ -816,244 +585,6 @@ DI auto dp4a(unsigned int a, unsigned int b, unsigned int c) -> unsigned int #endif } -/** - * @brief Logical-warp-level reduction - * @tparam logicalWarpSize Logical warp size (2, 4, 8, 16 or 32) - * @tparam T Value type to be reduced - * @tparam ReduceLambda Reduction operation type - * @param val input value - * @param reduce_op Reduction operation - * @return Reduction result. All lanes will have the valid result. - */ -template -DI T logicalWarpReduce(T val, ReduceLambda reduce_op) -{ -#pragma unroll - for (int i = logicalWarpSize / 2; i > 0; i >>= 1) { - T tmp = shfl_xor(val, i); - val = reduce_op(val, tmp); - } - return val; -} - -/** - * @brief Warp-level reduction - * @tparam T Value type to be reduced - * @tparam ReduceLambda Reduction operation type - * @param val input value - * @param reduce_op Reduction operation - * @return Reduction result. All lanes will have the valid result. - * @note Why not cub? Because cub doesn't seem to allow working with arbitrary - * number of warps in a block. All threads in the warp must enter this - * function together - */ -template -DI T warpReduce(T val, ReduceLambda reduce_op) -{ - return logicalWarpReduce(val, reduce_op); -} - -/** - * @brief Warp-level reduction - * @tparam T Value type to be reduced - * @param val input value - * @return Reduction result. All lanes will have the valid result. - * @note Why not cub? Because cub doesn't seem to allow working with arbitrary - * number of warps in a block. All threads in the warp must enter this - * function together - */ -template -DI T warpReduce(T val) -{ - return warpReduce(val, raft::add_op{}); -} - -/** - * @brief 1-D block-level reduction - * @param val input value - * @param smem shared memory region needed for storing intermediate results. It - * must alteast be of size: `sizeof(T) * nWarps` - * @param reduce_op a binary reduction operation. - * @return only the thread0 will contain valid reduced result - * @note Why not cub? Because cub doesn't seem to allow working with arbitrary - * number of warps in a block. All threads in the block must enter this - * function together. cub also uses too many registers - */ -template -DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) -{ - auto* sTemp = reinterpret_cast(smem); - int nWarps = (blockDim.x + WarpSize - 1) / WarpSize; - int lid = laneId(); - int wid = threadIdx.x / WarpSize; - val = warpReduce(val, reduce_op); - if (lid == 0) sTemp[wid] = val; - __syncthreads(); - val = lid < nWarps ? sTemp[lid] : T(0); - return warpReduce(val, reduce_op); -} - -/** - * @brief 1-D warp-level weighted random reduction which returns the rank. - * used conditional probability to return the weighted random rank of the result. - * @param rng random number generator, must have next_u32() function - * @param weight weight of the rank/index. - * @param idx index to be used as rank - * @return only the thread0 will contain valid reduced result - */ -template -__device__ inline T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) -{ -#pragma unroll - for (i_t offset = raft::WarpSize / 2; offset > 0; offset /= 2) { - T tmp_weight = shfl(weight, laneId() + offset); - i_t tmp_idx = shfl(idx, laneId() + offset); - T sum = (tmp_weight + weight); - weight = sum; - if (sum != 0) { - i_t rnd_number = (rng.next_u32() % sum); - if (rnd_number < tmp_weight) { idx = tmp_idx; } - } - } -} - -/** - * @brief 1-D block-level weighted random reduction which returns the rank. - * used conditional probability to return the weighted random rank of the result. - * @param rng random number generator, must have next_u32() function - * @param shbuf shared memory region needed for storing intermediate results. It - * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` - * @param weight weight of the rank/index. - * @param idx index to be used as rank - * @return only the thread0 will contain valid reduced result - */ -template -__inline__ __device__ i_t -blockRandomReduce(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) -{ - T* values = shbuf; - i_t* indices = (i_t*)&shbuf[WarpSize]; - i_t wid = threadIdx.x / WarpSize; - i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; - warpRandomReduce(rng, weight, idx); // Each warp performs partial reduction - i_t lane = laneId(); - if (lane == 0) { - values[wid] = weight; // Write reduced value to shared memory - indices[wid] = idx; // Write reduced value to shared memory - } - - __syncthreads(); // Wait for all partial reductions - - // read from shared memory only if that warp existed - if (lane < nWarps) { - weight = values[lane]; - idx = indices[lane]; - } else { - // get the min if it is a max op, get the max if it is a min op - weight = 0; - idx = -1; - } - __syncthreads(); - if (wid == 0) warpRandomReduce(rng, weight, idx); - return idx; -} - -/** - * @brief 1-D warp-level ranked reduction which returns the value and rank. - * thread 0 will have valid result and rank(idx). - * @param val input value - * @param idx index to be used as rank - * @param reduce_op a binary reduction operation. - * @return only the thread0 will contain valid reduced result - */ -template -__inline__ __device__ void warpRankedReduce(T& val, - i_t& idx, - ReduceLambda reduce_op = raft::min_op{}) -{ -#pragma unroll - for (i_t offset = WarpSize / 2; offset > 0; offset /= 2) { - T tmpVal = shfl(val, laneId() + offset); - i_t tmpIdx = shfl(idx, laneId() + offset); - if (reduce_op(tmpVal, val) == tmpVal) { - val = tmpVal; - idx = tmpIdx; - } - } -} - -/** - * @brief 1-D block-level ranked reduction which returns the value and rank. - * thread 0 will have valid result and rank(idx). - * @param val input value - * @param smem shared memory region needed for storing intermediate results. It - * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` - * @param idx index to be used as rank - * @param reduce_op a binary reduction operation. - * @return only the thread0 will contain valid reduced result - */ -template -__inline__ __device__ std::pair blockRankedReduce(T val, - T* shbuf, - i_t idx = threadIdx.x, - ReduceLambda reduce_op = raft::min_op{}) -{ - T* values = shbuf; - i_t* indices = (i_t*)&shbuf[WarpSize]; - i_t wid = threadIdx.x / WarpSize; - i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; - warpRankedReduce(val, idx, reduce_op); // Each warp performs partial reduction - i_t lane = laneId(); - if (lane == 0) { - values[wid] = val; // Write reduced value to shared memory - indices[wid] = idx; // Write reduced value to shared memory - } - - __syncthreads(); // Wait for all partial reductions - - // read from shared memory only if that warp existed - if (lane < nWarps) { - val = values[lane]; - idx = indices[lane]; - } else { - // get the min if it is a max op, get the max if it is a min op - val = reduce_op(std::numeric_limits::min(), std::numeric_limits::max()) == - std::numeric_limits::min() - ? std::numeric_limits::max() - : std::numeric_limits::min(); - idx = -1; - } - __syncthreads(); - if (wid == 0) warpRankedReduce(val, idx, reduce_op); - return std::pair{val, idx}; -} - -/** - * @brief Executes a 1d binary block reduce - * @param val binary value to be reduced across the thread block - * @param reduce_op a binary reduction operation. - * @return only the thread0 will contain valid reduced result - */ -template -DI i_t binaryBlockReduce(i_t val, i_t* shared) -{ - static_assert(BLOCK_SIZE <= 1024); - assert(val == 0 || val == 1); - const uint32_t mask = __ballot_sync(~0, val); - const uint32_t n_items = __popc(mask); - - // Each first thread of the warp - if (threadIdx.x % WarpSize == 0) shared[threadIdx.x / WarpSize] = n_items; - __syncthreads(); - - val = (threadIdx.x < BLOCK_SIZE / WarpSize) ? shared[threadIdx.x] : 0; - - if (threadIdx.x < WarpSize) - return warpReduce(val); - else // Only first warp gets the results - return -1; -} - /** * @brief Simple utility function to determine whether user_stream or one of the * internal streams should be used. diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh new file mode 100644 index 0000000000..a41491b92a --- /dev/null +++ b/cpp/include/raft/util/reduction.cuh @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include + +namespace raft { + +/** + * @brief Logical-warp-level reduction + * @tparam logicalWarpSize Logical warp size (2, 4, 8, 16 or 32) + * @tparam T Value type to be reduced + * @tparam ReduceLambda Reduction operation type + * @param val input value + * @param reduce_op Reduction operation + * @return Reduction result. All lanes will have the valid result. + */ +template +DI T logicalWarpReduce(T val, ReduceLambda reduce_op) +{ +#pragma unroll + for (int i = logicalWarpSize / 2; i > 0; i >>= 1) { + T tmp = shfl_xor(val, i); + val = reduce_op(val, tmp); + } + return val; +} + +/** + * @brief Warp-level reduction + * @tparam T Value type to be reduced + * @tparam ReduceLambda Reduction operation type + * @param val input value + * @param reduce_op Reduction operation + * @return Reduction result. All lanes will have the valid result. + * @note Why not cub? Because cub doesn't seem to allow working with arbitrary + * number of warps in a block. All threads in the warp must enter this + * function together + */ +template +DI T warpReduce(T val, ReduceLambda reduce_op) +{ + return logicalWarpReduce(val, reduce_op); +} + +/** + * @brief Warp-level reduction + * @tparam T Value type to be reduced + * @param val input value + * @return Reduction result. All lanes will have the valid result. + * @note Why not cub? Because cub doesn't seem to allow working with arbitrary + * number of warps in a block. All threads in the warp must enter this + * function together + */ +template +DI T warpReduce(T val) +{ + return warpReduce(val, raft::add_op{}); +} + +/** + * @brief 1-D block-level reduction + * @param val input value + * @param smem shared memory region needed for storing intermediate results. It + * must alteast be of size: `sizeof(T) * nWarps` + * @param reduce_op a binary reduction operation. + * @return only the thread0 will contain valid reduced result + * @note Why not cub? Because cub doesn't seem to allow working with arbitrary + * number of warps in a block. All threads in the block must enter this + * function together. cub also uses too many registers + */ +template +DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) +{ + auto* sTemp = reinterpret_cast(smem); + int nWarps = (blockDim.x + WarpSize - 1) / WarpSize; + int lid = laneId(); + int wid = threadIdx.x / WarpSize; + val = warpReduce(val, reduce_op); + if (lid == 0) sTemp[wid] = val; + __syncthreads(); + val = lid < nWarps ? sTemp[lid] : T(0); + return warpReduce(val, reduce_op); +} + +/** + * @brief 1-D warp-level weighted random reduction which returns the rank. + * used conditional probability to return the weighted random rank of the result. + * @param rng random number generator, must have next_u32() function + * @param weight weight of the rank/index. + * @param idx index to be used as rank + * @return only the thread0 will contain valid reduced result + */ +template +__device__ inline T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) +{ +#pragma unroll + for (i_t offset = raft::WarpSize / 2; offset > 0; offset /= 2) { + T tmp_weight = shfl(weight, laneId() + offset); + i_t tmp_idx = shfl(idx, laneId() + offset); + T sum = (tmp_weight + weight); + weight = sum; + if (sum != 0) { + i_t rnd_number = (rng.next_u32() % sum); + if (rnd_number < tmp_weight) { idx = tmp_idx; } + } + } +} + +/** + * @brief 1-D block-level weighted random reduction which returns the rank. + * used conditional probability to return the weighted random rank of the result. + * @param rng random number generator, must have next_u32() function + * @param shbuf shared memory region needed for storing intermediate results. It + * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` + * @param weight weight of the rank/index. + * @param idx index to be used as rank + * @return only the thread0 will contain valid reduced result + */ +template +__inline__ __device__ i_t +blockRandomReduce(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) +{ + T* values = shbuf; + i_t* indices = (i_t*)&shbuf[WarpSize]; + i_t wid = threadIdx.x / WarpSize; + i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; + warpRandomReduce(rng, weight, idx); // Each warp performs partial reduction + i_t lane = laneId(); + if (lane == 0) { + values[wid] = weight; // Write reduced value to shared memory + indices[wid] = idx; // Write reduced value to shared memory + } + + __syncthreads(); // Wait for all partial reductions + + // read from shared memory only if that warp existed + if (lane < nWarps) { + weight = values[lane]; + idx = indices[lane]; + } else { + // get the min if it is a max op, get the max if it is a min op + weight = 0; + idx = -1; + } + __syncthreads(); + if (wid == 0) warpRandomReduce(rng, weight, idx); + return idx; +} + +/** + * @brief 1-D warp-level ranked reduction which returns the value and rank. + * thread 0 will have valid result and rank(idx). + * @param val input value + * @param idx index to be used as rank + * @param reduce_op a binary reduction operation. + * @return only the thread0 will contain valid reduced result + */ +template +__inline__ __device__ void warpRankedReduce(T& val, + i_t& idx, + ReduceLambda reduce_op = raft::min_op{}) +{ +#pragma unroll + for (i_t offset = WarpSize / 2; offset > 0; offset /= 2) { + T tmpVal = shfl(val, laneId() + offset); + i_t tmpIdx = shfl(idx, laneId() + offset); + if (reduce_op(tmpVal, val) == tmpVal) { + val = tmpVal; + idx = tmpIdx; + } + } +} + +/** + * @brief 1-D block-level ranked reduction which returns the value and rank. + * thread 0 will have valid result and rank(idx). + * @param val input value + * @param smem shared memory region needed for storing intermediate results. It + * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` + * @param idx index to be used as rank + * @param reduce_op a binary reduction operation. + * @return only the thread0 will contain valid reduced result + */ +template +__inline__ __device__ std::pair blockRankedReduce(T val, + T* shbuf, + i_t idx = threadIdx.x, + ReduceLambda reduce_op = raft::min_op{}) +{ + T* values = shbuf; + i_t* indices = (i_t*)&shbuf[WarpSize]; + i_t wid = threadIdx.x / WarpSize; + i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; + warpRankedReduce(val, idx, reduce_op); // Each warp performs partial reduction + i_t lane = laneId(); + if (lane == 0) { + values[wid] = val; // Write reduced value to shared memory + indices[wid] = idx; // Write reduced value to shared memory + } + + __syncthreads(); // Wait for all partial reductions + + // read from shared memory only if that warp existed + if (lane < nWarps) { + val = values[lane]; + idx = indices[lane]; + } else { + // get the min if it is a max op, get the max if it is a min op + val = reduce_op(std::numeric_limits::min(), std::numeric_limits::max()) == + std::numeric_limits::min() + ? std::numeric_limits::max() + : std::numeric_limits::min(); + idx = -1; + } + __syncthreads(); + if (wid == 0) warpRankedReduce(val, idx, reduce_op); + return std::pair{val, idx}; +} + +/** + * @brief Executes a 1d binary block reduce + * @param val binary value to be reduced across the thread block + * @param reduce_op a binary reduction operation. + * @return only the thread0 will contain valid reduced result + */ +template +DI i_t binaryBlockReduce(i_t val, i_t* shared) +{ + static_assert(BLOCK_SIZE <= 1024); + assert(val == 0 || val == 1); + const uint32_t mask = __ballot_sync(~0, val); + const uint32_t n_items = __popc(mask); + + // Each first thread of the warp + if (threadIdx.x % WarpSize == 0) shared[threadIdx.x / WarpSize] = n_items; + __syncthreads(); + + val = (threadIdx.x < BLOCK_SIZE / WarpSize) ? shared[threadIdx.x] : 0; + + if (threadIdx.x < WarpSize) + return warpReduce(val); + else // Only first warp gets the results + return -1; +} + +} // namespace raft \ No newline at end of file diff --git a/cpp/include/raft/util/warp_primitives.cuh b/cpp/include/raft/util/warp_primitives.cuh new file mode 100644 index 0000000000..94fddbe0f3 --- /dev/null +++ b/cpp/include/raft/util/warp_primitives.cuh @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include + +namespace raft { + +/** True CUDA alignment of a type (adapted from CUB) */ +template +struct cuda_alignment { + struct Pad { + T val; + char byte; + }; + + static constexpr int bytes = sizeof(Pad) - sizeof(T); +}; + +template +struct is_multiple { + static constexpr int large_align_bytes = cuda_alignment::bytes; + static constexpr int unit_align_bytes = cuda_alignment::bytes; + static constexpr bool value = + (sizeof(LargeT) % sizeof(UnitT) == 0) && (large_align_bytes % unit_align_bytes == 0); +}; + +template +inline constexpr bool is_multiple_v = is_multiple::value; + +/** apply a warp-wide fence (useful from Volta+ archs) */ +DI void warpFence() +{ +#if __CUDA_ARCH__ >= 700 + __syncwarp(); +#endif +} + +/** warp-wide any boolean aggregator */ +DI bool any(bool inFlag, uint32_t mask = 0xffffffffu) +{ +#if CUDART_VERSION >= 9000 + inFlag = __any_sync(mask, inFlag); +#else + inFlag = __any(inFlag); +#endif + return inFlag; +} + +/** warp-wide all boolean aggregator */ +DI bool all(bool inFlag, uint32_t mask = 0xffffffffu) +{ +#if CUDART_VERSION >= 9000 + inFlag = __all_sync(mask, inFlag); +#else + inFlag = __all(inFlag); +#endif + return inFlag; +} + +/** For every thread in the warp, set the corresponding bit to the thread's flag value. */ +DI uint32_t ballot(bool inFlag, uint32_t mask = 0xffffffffu) +{ +#if CUDART_VERSION >= 9000 + return __ballot_sync(mask, inFlag); +#else + return __ballot(inFlag); +#endif +} + +template +struct is_shuffleable { + static constexpr bool value = + std::is_same_v || std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || std::is_same_v; +}; + +template +inline constexpr bool is_shuffleable_v = is_shuffleable::value; + +/** + * @brief Shuffle the data inside a warp + * @tparam T the data type + * @param val value to be shuffled + * @param srcLane lane from where to shuffle + * @param width lane width + * @param mask mask of participating threads (Volta+) + * @return the shuffled data + */ +template +DI std::enable_if_t, T> shfl(T val, + int srcLane, + int width = WarpSize, + uint32_t mask = 0xffffffffu) +{ +#if CUDART_VERSION >= 9000 + return __shfl_sync(mask, val, srcLane, width); +#else + return __shfl(val, srcLane, width); +#endif +} + +/// Overload of shfl for data types not supported by the CUDA intrinsics +template +DI std::enable_if_t, T> shfl(T val, + int srcLane, + int width = WarpSize, + uint32_t mask = 0xffffffffu) +{ + using UnitT = + std::conditional_t, + unsigned int, + std::conditional_t, unsigned short, unsigned char>>; + + constexpr int n_words = sizeof(T) / sizeof(UnitT); + + T output; + UnitT* output_alias = reinterpret_cast(&output); + UnitT* input_alias = reinterpret_cast(&val); + + unsigned int shuffle_word; + shuffle_word = shfl((unsigned int)input_alias[0], srcLane, width, mask); + output_alias[0] = shuffle_word; + +#pragma unroll + for (int i = 1; i < n_words; ++i) { + shuffle_word = shfl((unsigned int)input_alias[i], srcLane, width, mask); + output_alias[i] = shuffle_word; + } + + return output; +} + +/** + * @brief Shuffle the data inside a warp from lower lane IDs + * @tparam T the data type + * @param val value to be shuffled + * @param delta lower lane ID delta from where to shuffle + * @param width lane width + * @param mask mask of participating threads (Volta+) + * @return the shuffled data + */ +template +DI std::enable_if_t, T> shfl_up(T val, + int delta, + int width = WarpSize, + uint32_t mask = 0xffffffffu) +{ +#if CUDART_VERSION >= 9000 + return __shfl_up_sync(mask, val, delta, width); +#else + return __shfl_up(val, delta, width); +#endif +} + +/// Overload of shfl_up for data types not supported by the CUDA intrinsics +template +DI std::enable_if_t, T> shfl_up(T val, + int delta, + int width = WarpSize, + uint32_t mask = 0xffffffffu) +{ + using UnitT = + std::conditional_t, + unsigned int, + std::conditional_t, unsigned short, unsigned char>>; + + constexpr int n_words = sizeof(T) / sizeof(UnitT); + + T output; + UnitT* output_alias = reinterpret_cast(&output); + UnitT* input_alias = reinterpret_cast(&val); + + unsigned int shuffle_word; + shuffle_word = shfl_up((unsigned int)input_alias[0], delta, width, mask); + output_alias[0] = shuffle_word; + +#pragma unroll + for (int i = 1; i < n_words; ++i) { + shuffle_word = shfl_up((unsigned int)input_alias[i], delta, width, mask); + output_alias[i] = shuffle_word; + } + + return output; +} + +/** + * @brief Shuffle the data inside a warp + * @tparam T the data type + * @param val value to be shuffled + * @param laneMask mask to be applied in order to perform xor shuffle + * @param width lane width + * @param mask mask of participating threads (Volta+) + * @return the shuffled data + */ +template +DI std::enable_if_t, T> shfl_xor(T val, + int laneMask, + int width = WarpSize, + uint32_t mask = 0xffffffffu) +{ +#if CUDART_VERSION >= 9000 + return __shfl_xor_sync(mask, val, laneMask, width); +#else + return __shfl_xor(val, laneMask, width); +#endif +} + +/// Overload of shfl_xor for data types not supported by the CUDA intrinsics +template +DI std::enable_if_t, T> shfl_xor(T val, + int laneMask, + int width = WarpSize, + uint32_t mask = 0xffffffffu) +{ + using UnitT = + std::conditional_t, + unsigned int, + std::conditional_t, unsigned short, unsigned char>>; + + constexpr int n_words = sizeof(T) / sizeof(UnitT); + + T output; + UnitT* output_alias = reinterpret_cast(&output); + UnitT* input_alias = reinterpret_cast(&val); + + unsigned int shuffle_word; + shuffle_word = shfl_xor((unsigned int)input_alias[0], laneMask, width, mask); + output_alias[0] = shuffle_word; + +#pragma unroll + for (int i = 1; i < n_words; ++i) { + shuffle_word = shfl_xor((unsigned int)input_alias[i], laneMask, width, mask); + output_alias[i] = shuffle_word; + } + + return output; +} + +} // namespace raft \ No newline at end of file diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index c8d4f91ec0..81c87caf63 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -245,14 +245,8 @@ if(BUILD_TESTS) ) ConfigureTest( - NAME - SPARSE_DIST_TEST - PATH - test/sparse/dist_coo_spmv.cu - test/sparse/distance.cu - test/sparse/gram.cu - OPTIONAL - LIB + NAME SPARSE_DIST_TEST PATH test/sparse/dist_coo_spmv.cu test/sparse/distance.cu + test/sparse/gram.cu OPTIONAL LIB ) ConfigureTest( @@ -319,7 +313,15 @@ if(BUILD_TESTS) ) ConfigureTest( - NAME UTILS_TEST PATH test/core/seive.cu test/util/bitonic_sort.cu test/util/cudart_utils.cpp - test/util/device_atomics.cu test/util/integer_utils.cpp test/util/pow2_utils.cu + NAME + UTILS_TEST + PATH + test/core/seive.cu + test/util/bitonic_sort.cu + test/util/cudart_utils.cpp + test/util/device_atomics.cu + test/util/integer_utils.cpp + test/util/pow2_utils.cu + test/util/reduction.cu ) endif() diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu index 3a7666920e..4405e7ae3c 100644 --- a/cpp/test/util/reduction.cu +++ b/cpp/test/util/reduction.cu @@ -17,7 +17,7 @@ #include "../test_utils.cuh" #include -#include +#include #include #include @@ -181,10 +181,13 @@ const std::vector binary_test_vector{ auto reduction_input = ::testing::Values(test_vector); auto binary_reduction_input = ::testing::Values(binary_test_vector); -using ReductionTestInt = ReductionTest; // NOLINT +using ReductionTestInt = ReductionTest; // NOLINT +using BinaryReductionTestInt = ReductionTest; // NOLINT TEST_P(ReductionTestInt, REDUCTIONS) { run_reduction(); } -INSTANTIATE_TEST_CASE_P(ReductionTest, ReductionTestInt, reduction_input); // NOLINT -TEST_P(ReductionTestInt, BINARY_REDUCTION) { run_binary_reduction(); } // NOLINT -INSTANTIATE_TEST_CASE_P(BinaryReductionTest, ReductionTestInt, binary_reduction_input); // NOLINT +INSTANTIATE_TEST_CASE_P(ReductionTest, ReductionTestInt, reduction_input); // NOLINT +TEST_P(BinaryReductionTestInt, BINARY_REDUCTION) { run_binary_reduction(); } // NOLINT +INSTANTIATE_TEST_CASE_P(BinaryReductionTest, + BinaryReductionTestInt, + binary_reduction_input); // NOLINT } // namespace raft::util From d856c3b1b2d46614186973cf94a79ee59eae24ad Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 27 Apr 2023 09:14:39 -0700 Subject: [PATCH 05/20] remove next_pow2 function --- cpp/include/raft/util/pow2_utils.cuh | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cpp/include/raft/util/pow2_utils.cuh b/cpp/include/raft/util/pow2_utils.cuh index 1d42920bf7..68b35837b6 100644 --- a/cpp/include/raft/util/pow2_utils.cuh +++ b/cpp/include/raft/util/pow2_utils.cuh @@ -20,12 +20,6 @@ namespace raft { -template -constexpr i_t next_pow2(i_t val) -{ - return 1 << (log2(val) + 1); -} - /** * @brief Fast arithmetics and alignment checks for power-of-two values known at compile time. * From b4fe4703b9d4ea84b2da359df87a87ad9ce2ab78 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Fri, 28 Apr 2023 02:05:02 -0700 Subject: [PATCH 06/20] add device span include --- cpp/include/raft/util/device_loads_stores.cuh | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/include/raft/util/device_loads_stores.cuh b/cpp/include/raft/util/device_loads_stores.cuh index 44559d247d..62fa349a25 100644 --- a/cpp/include/raft/util/device_loads_stores.cuh +++ b/cpp/include/raft/util/device_loads_stores.cuh @@ -18,6 +18,7 @@ #include // uintX_t #include // DI +#include namespace raft { From a2e5608493dbec6b7924bb6ff39d716fb34f7167 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Fri, 28 Apr 2023 02:10:46 -0700 Subject: [PATCH 07/20] fix style include order --- cpp/include/raft/util/device_loads_stores.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/raft/util/device_loads_stores.cuh b/cpp/include/raft/util/device_loads_stores.cuh index 62fa349a25..e3d54c51f5 100644 --- a/cpp/include/raft/util/device_loads_stores.cuh +++ b/cpp/include/raft/util/device_loads_stores.cuh @@ -17,8 +17,8 @@ #pragma once #include // uintX_t -#include // DI #include +#include // DI namespace raft { From 91dbcc26281d26be4a0d172d03c20d3d888eddd9 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Fri, 28 Apr 2023 05:49:31 -0700 Subject: [PATCH 08/20] add default template type --- cpp/include/raft/util/reduction.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index a41491b92a..35b0802d59 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -88,7 +88,7 @@ DI T warpReduce(T val) * number of warps in a block. All threads in the block must enter this * function together. cub also uses too many registers */ -template +template DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) { auto* sTemp = reinterpret_cast(smem); From 17023617a85c1b27b9f91bd31d18503d129e97c5 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Tue, 2 May 2023 01:21:58 -0700 Subject: [PATCH 09/20] add correct function comments --- cpp/include/raft/util/reduction.cuh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index 35b0802d59..1006d46c14 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -195,7 +195,7 @@ __inline__ __device__ void warpRankedReduce(T& val, * @brief 1-D block-level ranked reduction which returns the value and rank. * thread 0 will have valid result and rank(idx). * @param val input value - * @param smem shared memory region needed for storing intermediate results. It + * @param shbuf shared memory region needed for storing intermediate results. It * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` * @param idx index to be used as rank * @param reduce_op a binary reduction operation. @@ -240,11 +240,11 @@ __inline__ __device__ std::pair blockRankedReduce(T val, /** * @brief Executes a 1d binary block reduce * @param val binary value to be reduced across the thread block - * @param reduce_op a binary reduction operation. + * @param shmem memory needed for the reduction. It should be at least of size blockDim.x/WarpSize * @return only the thread0 will contain valid reduced result */ template -DI i_t binaryBlockReduce(i_t val, i_t* shared) +DI i_t binaryBlockReduce(i_t val, i_t* shmem) { static_assert(BLOCK_SIZE <= 1024); assert(val == 0 || val == 1); @@ -252,10 +252,10 @@ DI i_t binaryBlockReduce(i_t val, i_t* shared) const uint32_t n_items = __popc(mask); // Each first thread of the warp - if (threadIdx.x % WarpSize == 0) shared[threadIdx.x / WarpSize] = n_items; + if (threadIdx.x % WarpSize == 0) shmem[threadIdx.x / WarpSize] = n_items; __syncthreads(); - val = (threadIdx.x < BLOCK_SIZE / WarpSize) ? shared[threadIdx.x] : 0; + val = (threadIdx.x < BLOCK_SIZE / WarpSize) ? shmem[threadIdx.x] : 0; if (threadIdx.x < WarpSize) return warpReduce(val); From 85d0db1890abb054305bcc75c2eb964bb0ab478e Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Wed, 3 May 2023 01:41:44 -0700 Subject: [PATCH 10/20] add include comments and paranthesis to if block --- cpp/include/raft/util/cuda_utils.cuh | 2 ++ cpp/include/raft/util/reduction.cuh | 9 ++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cpp/include/raft/util/cuda_utils.cuh b/cpp/include/raft/util/cuda_utils.cuh index e211d41e2f..0523dcc81c 100644 --- a/cpp/include/raft/util/cuda_utils.cuh +++ b/cpp/include/raft/util/cuda_utils.cuh @@ -23,6 +23,8 @@ #include #include #include +// For backward compatibility, we include the follow headers. They contain +// functionality that were previously contained in cuda_utils.cuh #include #include diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index 1006d46c14..36829b33eb 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -252,15 +252,18 @@ DI i_t binaryBlockReduce(i_t val, i_t* shmem) const uint32_t n_items = __popc(mask); // Each first thread of the warp - if (threadIdx.x % WarpSize == 0) shmem[threadIdx.x / WarpSize] = n_items; + if (threadIdx.x % WarpSize == 0) { shmem[threadIdx.x / WarpSize] = n_items; } __syncthreads(); val = (threadIdx.x < BLOCK_SIZE / WarpSize) ? shmem[threadIdx.x] : 0; - if (threadIdx.x < WarpSize) + if (threadIdx.x < WarpSize) { return warpReduce(val); - else // Only first warp gets the results + } + // Only first warp gets the results + else { return -1; + } } } // namespace raft \ No newline at end of file From 94138a642aefa0b8f761d15f305010b26adf3783 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 4 May 2023 00:50:46 -0700 Subject: [PATCH 11/20] use DI macro, add max_warps_per_block var, remove stale comment --- cpp/include/raft/util/reduction.cuh | 9 ++++----- cpp/test/util/reduction.cu | 8 +++++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index 36829b33eb..bb4afd2217 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -111,7 +111,7 @@ DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) * @return only the thread0 will contain valid reduced result */ template -__device__ inline T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) +DI T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) { #pragma unroll for (i_t offset = raft::WarpSize / 2; offset > 0; offset /= 2) { @@ -137,7 +137,7 @@ __device__ inline T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) * @return only the thread0 will contain valid reduced result */ template -__inline__ __device__ i_t +DI i_t blockRandomReduce(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) { T* values = shbuf; @@ -158,7 +158,6 @@ blockRandomReduce(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) weight = values[lane]; idx = indices[lane]; } else { - // get the min if it is a max op, get the max if it is a min op weight = 0; idx = -1; } @@ -176,7 +175,7 @@ blockRandomReduce(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) * @return only the thread0 will contain valid reduced result */ template -__inline__ __device__ void warpRankedReduce(T& val, +DI void warpRankedReduce(T& val, i_t& idx, ReduceLambda reduce_op = raft::min_op{}) { @@ -202,7 +201,7 @@ __inline__ __device__ void warpRankedReduce(T& val, * @return only the thread0 will contain valid reduced result */ template -__inline__ __device__ std::pair blockRankedReduce(T val, +DI std::pair blockRankedReduce(T val, T* shbuf, i_t idx = threadIdx.x, ReduceLambda reduce_op = raft::min_op{}) diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu index 4405e7ae3c..e632cc21d0 100644 --- a/cpp/test/util/reduction.cu +++ b/cpp/test/util/reduction.cu @@ -29,11 +29,13 @@ namespace raft::util { +constexpr int max_warps_per_block = 32; + template __global__ void test_reduction_kernel(const int* input, int* reduction_res, ReduceLambda reduce_op) { assert(gridDim.x == 1); - __shared__ int red_buf[raft::WarpSize]; + __shared__ int red_buf[max_warps_per_block]; int th_val = input[threadIdx.x]; th_val = raft::blockReduce(th_val, (char*)red_buf, reduce_op); if (threadIdx.x == 0) { reduction_res[0] = th_val; } @@ -46,7 +48,7 @@ __global__ void test_ranked_reduction_kernel(const int* input, ReduceLambda reduce_op) { assert(gridDim.x == 1); - __shared__ int red_buf[2 * raft::WarpSize]; + __shared__ int red_buf[2 * max_warps_per_block]; int th_val = input[threadIdx.x]; int th_rank = threadIdx.x; auto result = raft::blockRankedReduce(th_val, red_buf, th_rank, reduce_op); @@ -59,7 +61,7 @@ __global__ void test_ranked_reduction_kernel(const int* input, __global__ void test_random_reduction_kernel(const int* input, int* reduction_res) { assert(gridDim.x == 1); - __shared__ int red_buf[2 * raft::WarpSize]; + __shared__ int red_buf[2 * max_warps_per_block]; raft::random::PCGenerator thread_rng(1234, threadIdx.x, 0); int th_val = input[threadIdx.x]; int th_rank = threadIdx.x; From f1daedc3fd13aee87f871d88c4601521dee366f7 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 4 May 2023 01:04:27 -0700 Subject: [PATCH 12/20] use better comments and naming for weightedSelect and add static_asserts --- cpp/include/raft/util/reduction.cuh | 30 ++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index bb4afd2217..f7c02187bd 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -103,16 +103,18 @@ DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) } /** - * @brief 1-D warp-level weighted random reduction which returns the rank. - * used conditional probability to return the weighted random rank of the result. + * @brief warp-level weighted selection of an index. + * It selects an index with the given discrete probability + * distribution(represented by weights of each index) * @param rng random number generator, must have next_u32() function * @param weight weight of the rank/index. * @param idx index to be used as rank * @return only the thread0 will contain valid reduced result */ template -DI T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) +DI T warpWeightedSelect(rng_t& rng, T& weight, i_t& idx) { + static_assert(std::is_integral::value, "The type T must be an integral type."); #pragma unroll for (i_t offset = raft::WarpSize / 2; offset > 0; offset /= 2) { T tmp_weight = shfl(weight, laneId() + offset); @@ -127,8 +129,9 @@ DI T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) } /** - * @brief 1-D block-level weighted random reduction which returns the rank. - * used conditional probability to return the weighted random rank of the result. + * @brief 1-D block-level weighted selection of an index. + * It selects an index with the given discrete probability + * distribution(represented by weights of each index) * @param rng random number generator, must have next_u32() function * @param shbuf shared memory region needed for storing intermediate results. It * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` @@ -137,14 +140,13 @@ DI T warpRandomReduce(rng_t& rng, T& weight, i_t& idx) * @return only the thread0 will contain valid reduced result */ template -DI i_t -blockRandomReduce(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) +DI i_t blockWeightedSelect(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) { T* values = shbuf; i_t* indices = (i_t*)&shbuf[WarpSize]; i_t wid = threadIdx.x / WarpSize; i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; - warpRandomReduce(rng, weight, idx); // Each warp performs partial reduction + warpWeightedSelect(rng, weight, idx); // Each warp performs partial reduction i_t lane = laneId(); if (lane == 0) { values[wid] = weight; // Write reduced value to shared memory @@ -175,9 +177,7 @@ blockRandomReduce(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) * @return only the thread0 will contain valid reduced result */ template -DI void warpRankedReduce(T& val, - i_t& idx, - ReduceLambda reduce_op = raft::min_op{}) +DI void warpRankedReduce(T& val, i_t& idx, ReduceLambda reduce_op = raft::min_op{}) { #pragma unroll for (i_t offset = WarpSize / 2; offset > 0; offset /= 2) { @@ -197,14 +197,14 @@ DI void warpRankedReduce(T& val, * @param shbuf shared memory region needed for storing intermediate results. It * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` * @param idx index to be used as rank - * @param reduce_op a binary reduction operation. + * @param reduce_op binary min or max operation. * @return only the thread0 will contain valid reduced result */ template DI std::pair blockRankedReduce(T val, - T* shbuf, - i_t idx = threadIdx.x, - ReduceLambda reduce_op = raft::min_op{}) + T* shbuf, + i_t idx = threadIdx.x, + ReduceLambda reduce_op = raft::min_op{}) { T* values = shbuf; i_t* indices = (i_t*)&shbuf[WarpSize]; From 986810f75db4d2e07ca47c638509872aa1aabcfb Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 4 May 2023 04:18:27 -0700 Subject: [PATCH 13/20] adjust test names --- cpp/test/util/reduction.cu | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu index e632cc21d0..f13805e7a0 100644 --- a/cpp/test/util/reduction.cu +++ b/cpp/test/util/reduction.cu @@ -58,14 +58,14 @@ __global__ void test_ranked_reduction_kernel(const int* input, } } -__global__ void test_random_reduction_kernel(const int* input, int* reduction_res) +__global__ void test_weighted_select_kernel(const int* input, int* reduction_res) { assert(gridDim.x == 1); __shared__ int red_buf[2 * max_warps_per_block]; raft::random::PCGenerator thread_rng(1234, threadIdx.x, 0); int th_val = input[threadIdx.x]; int th_rank = threadIdx.x; - int result = raft::blockRandomReduce(thread_rng, red_buf, th_val, th_rank); + int result = raft::blockWeightedSelect(thread_rng, red_buf, th_val, th_rank); if (threadIdx.x == 0) { reduction_res[0] = result; } } @@ -115,14 +115,14 @@ struct reduction_launch { ASSERT_EQ(rank_d.value(stream), rank_ref_val); } - static void run_random(const rmm::device_uvector& arr_d, - int ref_val, - rmm::cuda_stream_view stream) + static void run_weighted(const rmm::device_uvector& arr_d, + int ref_val, + rmm::cuda_stream_view stream) { rmm::device_scalar ref_d(stream); const int block_dim = 64; const int grid_dim = 1; - test_random_reduction_kernel<<>>(arr_d.data(), ref_d.data()); + test_weighted_select_kernel<<>>(arr_d.data(), ref_d.data()); stream.synchronize(); RAFT_CUDA_TRY(cudaPeekAtLastError()); ASSERT_EQ(ref_d.value(stream), ref_val); @@ -168,7 +168,7 @@ class ReductionTest : public testing::TestWithParam> { // NOLI reduction_launch::run_ranked(arr_d, 5, 15, raft::max_op{}, stream); reduction_launch::run_ranked(arr_d, 0, 26, raft::min_op{}, stream); // value 15 is for the current state of PCgenerator. adjust this if rng changes - reduction_launch::run_random(arr_d, 15, stream); + reduction_launch::run_weighted(arr_d, 15, stream); } void run_binary_reduction() { reduction_launch::run_binary(arr_d, 24, stream); } From 56e1deb324578f7c4236e5621c028d2fbb12cb9b Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 4 May 2023 06:55:33 -0700 Subject: [PATCH 14/20] fix compile error --- cpp/include/raft/util/reduction.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index f7c02187bd..1b6f0377c2 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -164,7 +164,7 @@ DI i_t blockWeightedSelect(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadId idx = -1; } __syncthreads(); - if (wid == 0) warpRandomReduce(rng, weight, idx); + if (wid == 0) warpWeightedSelect(rng, weight, idx); return idx; } From 3fec2355008b6113c8bc3f096042e6dc60de85cc Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Thu, 4 May 2023 07:24:51 -0700 Subject: [PATCH 15/20] Add detailed comment on blockWeightedSelect --- cpp/include/raft/util/reduction.cuh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index 1b6f0377c2..911c62da03 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -132,6 +132,15 @@ DI T warpWeightedSelect(rng_t& rng, T& weight, i_t& idx) * @brief 1-D block-level weighted selection of an index. * It selects an index with the given discrete probability * distribution(represented by weights of each index) + * + * Let w_i be the weight stored on thread i. We calculate the cumulative distribution function + * F_i = \sum_{k=0..i} weight_i. + * Sequentially, we could select one of the elements with with the desired probability using the + * following method. We can consider that each element has a subinterval assigned: [F_{i-1}, F_i). + * We generate a uniform random number in the [0, F_i) range, and check which subinterval it falls. + * We return idx corresponding to the selected subinterval. + * In parallel, we do a tree reduction and make a selection at every step when we combine two + * values. * @param rng random number generator, must have next_u32() function * @param shbuf shared memory region needed for storing intermediate results. It * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` From c154182817639e79271a20e597477186d7621e69 Mon Sep 17 00:00:00 2001 From: akifcorduk Date: Fri, 5 May 2023 01:06:20 -0700 Subject: [PATCH 16/20] add todo comment and remove command in the comment --- cpp/include/raft/util/reduction.cuh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index 911c62da03..cdcac0911b 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -106,6 +106,7 @@ DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) * @brief warp-level weighted selection of an index. * It selects an index with the given discrete probability * distribution(represented by weights of each index) + * @todo benchmark whether a scan and then selecting within the ranges is more efficient. * @param rng random number generator, must have next_u32() function * @param weight weight of the rank/index. * @param idx index to be used as rank @@ -134,7 +135,7 @@ DI T warpWeightedSelect(rng_t& rng, T& weight, i_t& idx) * distribution(represented by weights of each index) * * Let w_i be the weight stored on thread i. We calculate the cumulative distribution function - * F_i = \sum_{k=0..i} weight_i. + * F_i = sum_{k=0..i} weight_i. * Sequentially, we could select one of the elements with with the desired probability using the * following method. We can consider that each element has a subinterval assigned: [F_{i-1}, F_i). * We generate a uniform random number in the [0, F_i) range, and check which subinterval it falls. From 7d39c35b8751092629a0d387022b5f167d184815 Mon Sep 17 00:00:00 2001 From: Tamas Bela Feher Date: Tue, 16 May 2023 14:20:25 +0200 Subject: [PATCH 17/20] move block and warp level random sampling to random::device namespace --- cpp/include/raft/random/device/sample.cuh | 104 ++++++++++++++++++++++ cpp/include/raft/util/reduction.cuh | 76 ---------------- cpp/test/util/reduction.cu | 15 ++-- 3 files changed, 112 insertions(+), 83 deletions(-) create mode 100644 cpp/include/raft/random/device/sample.cuh diff --git a/cpp/include/raft/random/device/sample.cuh b/cpp/include/raft/random/device/sample.cuh new file mode 100644 index 0000000000..6b279d1cb7 --- /dev/null +++ b/cpp/include/raft/random/device/sample.cuh @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include + +namespace raft::random::device { + +/** + * @brief warp-level random sampling of an index. + * It selects an index with the given discrete probability + * distribution(represented by weights of each index) + * @param rng random number generator, must have next_u32() function + * @param weight weight of the rank/index. + * @param idx index to be used as rank + * @return only the thread0 will contain valid reduced result + */ +template +DI T warp_random_sample(rng_t& rng, T& weight, i_t& idx) +{ + // Todo(#1491): benchmark whether a scan and then selecting within the ranges is more efficient. + static_assert(std::is_integral::value, "The type T must be an integral type."); +#pragma unroll + for (i_t offset = raft::WarpSize / 2; offset > 0; offset /= 2) { + T tmp_weight = shfl(weight, laneId() + offset); + i_t tmp_idx = shfl(idx, laneId() + offset); + T sum = (tmp_weight + weight); + weight = sum; + if (sum != 0) { + i_t rnd_number = (rng.next_u32() % sum); + if (rnd_number < tmp_weight) { idx = tmp_idx; } + } + } +} + +/** + * @brief 1-D block-level random sampling of an index. + * It selects an index with the given discrete probability + * distribution(represented by weights of each index) + * + * Let w_i be the weight stored on thread i. We calculate the cumulative distribution function + * F_i = sum_{k=0..i} weight_i. + * Sequentially, we could select one of the elements with with the desired probability using the + * following method. We can consider that each element has a subinterval assigned: [F_{i-1}, F_i). + * We generate a uniform random number in the [0, F_i) range, and check which subinterval it falls. + * We return idx corresponding to the selected subinterval. + * In parallel, we do a tree reduction and make a selection at every step when we combine two + * values. + * @param rng random number generator, must have next_u32() function + * @param shbuf shared memory region needed for storing intermediate results. It + * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` + * @param weight weight of the rank/index. + * @param idx index to be used as rank + * @return only the thread0 will contain valid reduced result + */ +template +DI i_t block_random_sample(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) +{ + T* values = shbuf; + i_t* indices = (i_t*)&shbuf[WarpSize]; + i_t wid = threadIdx.x / WarpSize; + i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; + warp_random_sample(rng, weight, idx); // Each warp performs partial reduction + i_t lane = laneId(); + if (lane == 0) { + values[wid] = weight; // Write reduced value to shared memory + indices[wid] = idx; // Write reduced value to shared memory + } + + __syncthreads(); // Wait for all partial reductions + + // read from shared memory only if that warp existed + if (lane < nWarps) { + weight = values[lane]; + idx = indices[lane]; + } else { + weight = 0; + idx = -1; + } + __syncthreads(); + if (wid == 0) warp_random_sample(rng, weight, idx); + return idx; +} + +} \ No newline at end of file diff --git a/cpp/include/raft/util/reduction.cuh b/cpp/include/raft/util/reduction.cuh index cdcac0911b..74c57b4ca2 100644 --- a/cpp/include/raft/util/reduction.cuh +++ b/cpp/include/raft/util/reduction.cuh @@ -102,82 +102,6 @@ DI T blockReduce(T val, char* smem, ReduceLambda reduce_op = raft::add_op{}) return warpReduce(val, reduce_op); } -/** - * @brief warp-level weighted selection of an index. - * It selects an index with the given discrete probability - * distribution(represented by weights of each index) - * @todo benchmark whether a scan and then selecting within the ranges is more efficient. - * @param rng random number generator, must have next_u32() function - * @param weight weight of the rank/index. - * @param idx index to be used as rank - * @return only the thread0 will contain valid reduced result - */ -template -DI T warpWeightedSelect(rng_t& rng, T& weight, i_t& idx) -{ - static_assert(std::is_integral::value, "The type T must be an integral type."); -#pragma unroll - for (i_t offset = raft::WarpSize / 2; offset > 0; offset /= 2) { - T tmp_weight = shfl(weight, laneId() + offset); - i_t tmp_idx = shfl(idx, laneId() + offset); - T sum = (tmp_weight + weight); - weight = sum; - if (sum != 0) { - i_t rnd_number = (rng.next_u32() % sum); - if (rnd_number < tmp_weight) { idx = tmp_idx; } - } - } -} - -/** - * @brief 1-D block-level weighted selection of an index. - * It selects an index with the given discrete probability - * distribution(represented by weights of each index) - * - * Let w_i be the weight stored on thread i. We calculate the cumulative distribution function - * F_i = sum_{k=0..i} weight_i. - * Sequentially, we could select one of the elements with with the desired probability using the - * following method. We can consider that each element has a subinterval assigned: [F_{i-1}, F_i). - * We generate a uniform random number in the [0, F_i) range, and check which subinterval it falls. - * We return idx corresponding to the selected subinterval. - * In parallel, we do a tree reduction and make a selection at every step when we combine two - * values. - * @param rng random number generator, must have next_u32() function - * @param shbuf shared memory region needed for storing intermediate results. It - * must alteast be of size: `(sizeof(T) + sizeof(i_t)) * WarpSize` - * @param weight weight of the rank/index. - * @param idx index to be used as rank - * @return only the thread0 will contain valid reduced result - */ -template -DI i_t blockWeightedSelect(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadIdx.x) -{ - T* values = shbuf; - i_t* indices = (i_t*)&shbuf[WarpSize]; - i_t wid = threadIdx.x / WarpSize; - i_t nWarps = (blockDim.x + WarpSize - 1) / WarpSize; - warpWeightedSelect(rng, weight, idx); // Each warp performs partial reduction - i_t lane = laneId(); - if (lane == 0) { - values[wid] = weight; // Write reduced value to shared memory - indices[wid] = idx; // Write reduced value to shared memory - } - - __syncthreads(); // Wait for all partial reductions - - // read from shared memory only if that warp existed - if (lane < nWarps) { - weight = values[lane]; - idx = indices[lane]; - } else { - weight = 0; - idx = -1; - } - __syncthreads(); - if (wid == 0) warpWeightedSelect(rng, weight, idx); - return idx; -} - /** * @brief 1-D warp-level ranked reduction which returns the value and rank. * thread 0 will have valid result and rank(idx). diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu index f13805e7a0..17deaf99eb 100644 --- a/cpp/test/util/reduction.cu +++ b/cpp/test/util/reduction.cu @@ -16,6 +16,7 @@ #include "../test_utils.cuh" +#include #include #include @@ -58,14 +59,14 @@ __global__ void test_ranked_reduction_kernel(const int* input, } } -__global__ void test_weighted_select_kernel(const int* input, int* reduction_res) +__global__ void test_block_random_sample_kernel(const int* input, int* reduction_res) { assert(gridDim.x == 1); __shared__ int red_buf[2 * max_warps_per_block]; raft::random::PCGenerator thread_rng(1234, threadIdx.x, 0); int th_val = input[threadIdx.x]; int th_rank = threadIdx.x; - int result = raft::blockWeightedSelect(thread_rng, red_buf, th_val, th_rank); + int result = raft::random::device::block_random_sample(thread_rng, red_buf, th_val, th_rank); if (threadIdx.x == 0) { reduction_res[0] = result; } } @@ -115,14 +116,14 @@ struct reduction_launch { ASSERT_EQ(rank_d.value(stream), rank_ref_val); } - static void run_weighted(const rmm::device_uvector& arr_d, - int ref_val, - rmm::cuda_stream_view stream) + static void run_random_sample(const rmm::device_uvector& arr_d, + int ref_val, + rmm::cuda_stream_view stream) { rmm::device_scalar ref_d(stream); const int block_dim = 64; const int grid_dim = 1; - test_weighted_select_kernel<<>>(arr_d.data(), ref_d.data()); + test_block_random_sample_kernel<<>>(arr_d.data(), ref_d.data()); stream.synchronize(); RAFT_CUDA_TRY(cudaPeekAtLastError()); ASSERT_EQ(ref_d.value(stream), ref_val); @@ -168,7 +169,7 @@ class ReductionTest : public testing::TestWithParam> { // NOLI reduction_launch::run_ranked(arr_d, 5, 15, raft::max_op{}, stream); reduction_launch::run_ranked(arr_d, 0, 26, raft::min_op{}, stream); // value 15 is for the current state of PCgenerator. adjust this if rng changes - reduction_launch::run_weighted(arr_d, 15, stream); + reduction_launch::run_random_sample(arr_d, 15, stream); } void run_binary_reduction() { reduction_launch::run_binary(arr_d, 24, stream); } From f72aae8185425ec17c6a87696521da08f3d5b187 Mon Sep 17 00:00:00 2001 From: Tamas Bela Feher Date: Tue, 16 May 2023 14:37:49 +0200 Subject: [PATCH 18/20] move random/device/sample_device.cuh to random/sample_devic.cuh --- .../raft/random/{device/sample.cuh => sample_device.cuh} | 0 cpp/test/util/reduction.cu | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename cpp/include/raft/random/{device/sample.cuh => sample_device.cuh} (100%) diff --git a/cpp/include/raft/random/device/sample.cuh b/cpp/include/raft/random/sample_device.cuh similarity index 100% rename from cpp/include/raft/random/device/sample.cuh rename to cpp/include/raft/random/sample_device.cuh diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu index 17deaf99eb..5bec18fb86 100644 --- a/cpp/test/util/reduction.cu +++ b/cpp/test/util/reduction.cu @@ -16,8 +16,8 @@ #include "../test_utils.cuh" -#include #include +#include #include #include From 4d56e47ea56a0654f832a6740612396a65523511 Mon Sep 17 00:00:00 2001 From: Tamas Bela Feher Date: Wed, 17 May 2023 17:14:27 +0200 Subject: [PATCH 19/20] Revert "move random/device/sample_device.cuh to random/sample_devic.cuh" This reverts commit f72aae8185425ec17c6a87696521da08f3d5b187. --- .../raft/random/{sample_device.cuh => device/sample.cuh} | 0 cpp/test/util/reduction.cu | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename cpp/include/raft/random/{sample_device.cuh => device/sample.cuh} (100%) diff --git a/cpp/include/raft/random/sample_device.cuh b/cpp/include/raft/random/device/sample.cuh similarity index 100% rename from cpp/include/raft/random/sample_device.cuh rename to cpp/include/raft/random/device/sample.cuh diff --git a/cpp/test/util/reduction.cu b/cpp/test/util/reduction.cu index 5bec18fb86..17deaf99eb 100644 --- a/cpp/test/util/reduction.cu +++ b/cpp/test/util/reduction.cu @@ -16,8 +16,8 @@ #include "../test_utils.cuh" +#include #include -#include #include #include From c879e48277eefd1d50c380de14831d4da4745378 Mon Sep 17 00:00:00 2001 From: Tamas Bela Feher Date: Thu, 18 May 2023 00:14:20 +0200 Subject: [PATCH 20/20] Fix style --- cpp/include/raft/random/device/sample.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/raft/random/device/sample.cuh b/cpp/include/raft/random/device/sample.cuh index 6b279d1cb7..f08db3e0a2 100644 --- a/cpp/include/raft/random/device/sample.cuh +++ b/cpp/include/raft/random/device/sample.cuh @@ -101,4 +101,4 @@ DI i_t block_random_sample(rng_t rng, T* shbuf, T weight = 1, i_t idx = threadId return idx; } -} \ No newline at end of file +} // namespace raft::random::device \ No newline at end of file