From 9ec9b7061e1a28c0b55907232bcd226477e0ccb5 Mon Sep 17 00:00:00 2001 From: Louis Sugy Date: Thu, 8 Dec 2022 17:08:47 +0100 Subject: [PATCH] Shared-memory-cached kernel for `reduce_cols_by_key` to limit atomic conflicts (#1050) `reduce_cols_by_key` is used in k-means to compute the total weight in each cluster. When the number of clusters is really small, atomic conflicts make the reduction effectively sequential. This can be avoided by using a shared-memory cache. This PR adds a cached kernel and some heuristics to decide when to use it, resulting in large speedups for `reduce_cols_by_keys` and in turn ~15% speedups in some cases for k-means. Authors: - Louis Sugy (https://github.com/Nyrio) Approvers: - Tamas Bela Feher (https://github.com/tfeher) URL: https://github.com/rapidsai/raft/pull/1050 --- cpp/bench/CMakeLists.txt | 1 + cpp/bench/linalg/reduce_cols_by_key.cu | 78 +++++++++++++++ .../raft/cluster/detail/kmeans_common.cuh | 1 - .../raft/linalg/detail/reduce_cols_by_key.cuh | 73 ++++++++++++-- .../raft/linalg/reduce_cols_by_key.cuh | 25 +++-- cpp/test/linalg/reduce_cols_by_key.cu | 97 +++++++++++-------- 6 files changed, 217 insertions(+), 58 deletions(-) create mode 100644 cpp/bench/linalg/reduce_cols_by_key.cu diff --git a/cpp/bench/CMakeLists.txt b/cpp/bench/CMakeLists.txt index 4e6b6ceb40..99606dd2e9 100644 --- a/cpp/bench/CMakeLists.txt +++ b/cpp/bench/CMakeLists.txt @@ -96,6 +96,7 @@ if(BUILD_BENCH) bench/linalg/matrix_vector_op.cu bench/linalg/norm.cu bench/linalg/normalize.cu + bench/linalg/reduce_cols_by_key.cu bench/linalg/reduce_rows_by_key.cu bench/linalg/reduce.cu bench/main.cpp diff --git a/cpp/bench/linalg/reduce_cols_by_key.cu b/cpp/bench/linalg/reduce_cols_by_key.cu new file mode 100644 index 0000000000..43aeb69ab0 --- /dev/null +++ b/cpp/bench/linalg/reduce_cols_by_key.cu @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +namespace raft::bench::linalg { + +template +struct rcbk_params { + IdxT rows, cols; + IdxT keys; +}; + +template +inline auto operator<<(std::ostream& os, const rcbk_params& p) -> std::ostream& +{ + os << p.rows << "#" << p.cols << "#" << p.keys; + return os; +} + +template +struct reduce_cols_by_key : public fixture { + reduce_cols_by_key(const rcbk_params& p) + : params(p), in(p.rows * p.cols, stream), out(p.rows * p.keys, stream), keys(p.cols, stream) + { + raft::random::RngState rng{42}; + raft::random::uniformInt(rng, keys.data(), p.cols, (KeyT)0, (KeyT)p.keys, stream); + } + + void run_benchmark(::benchmark::State& state) override + { + std::ostringstream label_stream; + label_stream << params; + state.SetLabel(label_stream.str()); + + loop_on_state(state, [this]() { + raft::linalg::reduce_cols_by_key( + in.data(), keys.data(), out.data(), params.rows, params.cols, params.keys, stream, false); + }); + } + + protected: + rcbk_params params; + rmm::device_uvector in, out; + rmm::device_uvector keys; +}; // struct reduce_cols_by_key + +const std::vector> rcbk_inputs_i32 = + raft::util::itertools::product>( + {1, 10, 100, 1000}, {1000, 10000, 100000}, {8, 32, 128, 512, 2048}); +const std::vector> rcbk_inputs_i64 = + raft::util::itertools::product>( + {1, 10, 100, 1000}, {1000, 10000, 100000}, {8, 32, 128, 512, 2048}); + +RAFT_BENCH_REGISTER((reduce_cols_by_key), "", rcbk_inputs_i32); +RAFT_BENCH_REGISTER((reduce_cols_by_key), "", rcbk_inputs_i32); +RAFT_BENCH_REGISTER((reduce_cols_by_key), "", rcbk_inputs_i64); +RAFT_BENCH_REGISTER((reduce_cols_by_key), "", rcbk_inputs_i64); + +} // namespace raft::bench::linalg diff --git a/cpp/include/raft/cluster/detail/kmeans_common.cuh b/cpp/include/raft/cluster/detail/kmeans_common.cuh index 2973be8c23..b1ef43fea7 100644 --- a/cpp/include/raft/cluster/detail/kmeans_common.cuh +++ b/cpp/include/raft/cluster/detail/kmeans_common.cuh @@ -37,7 +37,6 @@ #include #include #include -#include #include #include #include diff --git a/cpp/include/raft/linalg/detail/reduce_cols_by_key.cuh b/cpp/include/raft/linalg/detail/reduce_cols_by_key.cuh index 450fb415e2..a85e04acca 100644 --- a/cpp/include/raft/linalg/detail/reduce_cols_by_key.cuh +++ b/cpp/include/raft/linalg/detail/reduce_cols_by_key.cuh @@ -29,12 +29,12 @@ namespace detail { ///@todo: specialize this to support shared-mem based atomics template -__global__ void reduce_cols_by_key_kernel( +__global__ void reduce_cols_by_key_direct_kernel( const T* data, const KeyIteratorT keys, T* out, IdxType nrows, IdxType ncols, IdxType nkeys) { typedef typename std::iterator_traits::value_type KeyType; - IdxType idx = blockIdx.x * blockDim.x + threadIdx.x; + IdxType idx = static_cast(blockIdx.x) * blockDim.x + threadIdx.x; if (idx >= (nrows * ncols)) return; ///@todo: yikes! use fast-int-div IdxType colId = idx % ncols; @@ -43,6 +43,38 @@ __global__ void reduce_cols_by_key_kernel( raft::myAtomicAdd(out + rowId * nkeys + key, data[idx]); } +template +__global__ void reduce_cols_by_key_cached_kernel( + const T* data, const KeyIteratorT keys, T* out, IdxType nrows, IdxType ncols, IdxType nkeys) +{ + typedef typename std::iterator_traits::value_type KeyType; + extern __shared__ char smem[]; + T* out_cache = reinterpret_cast(smem); + + // Initialize the shared memory accumulators to 0. + for (IdxType idx = threadIdx.x; idx < nrows * nkeys; idx += blockDim.x) { + out_cache[idx] = T{0}; + } + __syncthreads(); + + // Accumulate in shared memory + for (IdxType idx = static_cast(blockIdx.x) * blockDim.x + threadIdx.x; + idx < nrows * ncols; + idx += blockDim.x * static_cast(gridDim.x)) { + IdxType colId = idx % ncols; + IdxType rowId = idx / ncols; + KeyType key = keys[colId]; + raft::myAtomicAdd(out_cache + rowId * nkeys + key, data[idx]); + } + + // Add the shared-memory accumulators to the global results. + __syncthreads(); + for (IdxType idx = threadIdx.x; idx < nrows * nkeys; idx += blockDim.x) { + T val = out_cache[idx]; + if (val != T{0}) { raft::myAtomicAdd(out + idx, val); } + } +} + /** * @brief Computes the sum-reduction of matrix columns for each given key * @tparam T the input data type (as well as the output reduced matrix) @@ -60,6 +92,7 @@ __global__ void reduce_cols_by_key_kernel( * @param ncols number of columns in the input data * @param nkeys number of unique keys in the keys array * @param stream cuda stream to launch the kernel onto + * @param reset_sums Whether to reset the output sums to zero before reducing */ template void reduce_cols_by_key(const T* data, @@ -68,16 +101,42 @@ void reduce_cols_by_key(const T* data, IdxType nrows, IdxType ncols, IdxType nkeys, - cudaStream_t stream) + cudaStream_t stream, + bool reset_sums) { typedef typename std::iterator_traits::value_type KeyType; - RAFT_CUDA_TRY(cudaMemsetAsync(out, 0, sizeof(T) * nrows * nkeys, stream)); - constexpr int TPB = 256; - int nblks = (int)raft::ceildiv(nrows * ncols, TPB); - reduce_cols_by_key_kernel<<>>(data, keys, out, nrows, ncols, nkeys); + RAFT_EXPECTS(static_cast(nrows) * static_cast(ncols) <= + static_cast(std::numeric_limits::max()), + "Index type too small to represent indices in the input array."); + RAFT_EXPECTS(static_cast(nrows) * static_cast(nkeys) <= + static_cast(std::numeric_limits::max()), + "Index type too small to represent indices in the output array."); + + // Memset the output to zero to use atomics-based reduction. + if (reset_sums) { RAFT_CUDA_TRY(cudaMemsetAsync(out, 0, sizeof(T) * nrows * nkeys, stream)); } + + // The cached version is used when the cache fits in shared memory and the number of input + // elements is above a threshold (the cached version is slightly slower for small input arrays, + // and orders of magnitude faster for large input arrays). + size_t cache_size = static_cast(nrows * nkeys) * sizeof(T); + if (cache_size <= 49152ull && nrows * ncols >= IdxType{8192}) { + constexpr int TPB = 256; + int n_sm = raft::getMultiProcessorCount(); + int target_nblks = 4 * n_sm; + int max_nblks = raft::ceildiv(nrows * ncols, TPB); + int nblks = std::min(target_nblks, max_nblks); + reduce_cols_by_key_cached_kernel<<>>( + data, keys, out, nrows, ncols, nkeys); + } else { + constexpr int TPB = 256; + int nblks = raft::ceildiv(nrows * ncols, TPB); + reduce_cols_by_key_direct_kernel<<>>( + data, keys, out, nrows, ncols, nkeys); + } RAFT_CUDA_TRY(cudaPeekAtLastError()); } + }; // end namespace detail }; // end namespace linalg }; // end namespace raft diff --git a/cpp/include/raft/linalg/reduce_cols_by_key.cuh b/cpp/include/raft/linalg/reduce_cols_by_key.cuh index 436fce26fd..7b0ad2f984 100644 --- a/cpp/include/raft/linalg/reduce_cols_by_key.cuh +++ b/cpp/include/raft/linalg/reduce_cols_by_key.cuh @@ -43,6 +43,7 @@ namespace linalg { * @param ncols number of columns in the input data * @param nkeys number of unique keys in the keys array * @param stream cuda stream to launch the kernel onto + * @param reset_sums Whether to reset the output sums to zero before reducing */ template void reduce_cols_by_key(const T* data, @@ -51,9 +52,10 @@ void reduce_cols_by_key(const T* data, IdxType nrows, IdxType ncols, IdxType nkeys, - cudaStream_t stream) + cudaStream_t stream, + bool reset_sums = true) { - detail::reduce_cols_by_key(data, keys, out, nrows, ncols, nkeys, stream); + detail::reduce_cols_by_key(data, keys, out, nrows, ncols, nkeys, stream, reset_sums); } /** @@ -76,7 +78,9 @@ void reduce_cols_by_key(const T* data, * monotonically increasing keys array. * @param[out] out the output reduced raft::device_matrix_view along columns (dim = nrows x nkeys). * This will be assumed to be in row-major layout - * @param[in] nkeys number of unique keys in the keys array + * @param[in] nkeys Number of unique keys in the keys array. By default, inferred from the number of + * columns of out + * @param[in] reset_sums Whether to reset the output sums to zero before reducing */ template void reduce_cols_by_key( @@ -84,10 +88,16 @@ void reduce_cols_by_key( raft::device_matrix_view data, raft::device_vector_view keys, raft::device_matrix_view out, - IndexType nkeys) + IndexType nkeys = 0, + bool reset_sums = true) { - RAFT_EXPECTS(out.extent(0) == data.extent(0) && out.extent(1) == nkeys, - "Output is not of size nrows * nkeys"); + if (nkeys > 0) { + RAFT_EXPECTS(out.extent(1) == nkeys, "Output doesn't have nkeys columns"); + } else { + nkeys = out.extent(1); + } + RAFT_EXPECTS(out.extent(0) == data.extent(0), + "Output doesn't have the same number of rows as input"); RAFT_EXPECTS(keys.extent(0) == data.extent(1), "Keys is not of size ncols"); reduce_cols_by_key(data.data_handle(), @@ -96,7 +106,8 @@ void reduce_cols_by_key( data.extent(0), data.extent(1), nkeys, - handle.get_stream()); + handle.get_stream(), + reset_sums); } /** @} */ // end of group reduce_cols_by_key diff --git a/cpp/test/linalg/reduce_cols_by_key.cu b/cpp/test/linalg/reduce_cols_by_key.cu index 63afbe2fed..6fa616ba8c 100644 --- a/cpp/test/linalg/reduce_cols_by_key.cu +++ b/cpp/test/linalg/reduce_cols_by_key.cu @@ -20,27 +20,28 @@ #include #include #include +#include namespace raft { namespace linalg { -template +template void naiveReduceColsByKey(const T* in, - const uint32_t* keys, + const KeyT* keys, T* out_ref, - uint32_t nrows, - uint32_t ncols, - uint32_t nkeys, + IdxT nrows, + IdxT ncols, + IdxT nkeys, cudaStream_t stream) { - std::vector h_keys(ncols, 0u); + std::vector h_keys(ncols, 0u); raft::copy(&(h_keys[0]), keys, ncols, stream); std::vector h_in(nrows * ncols); raft::copy(&(h_in[0]), in, nrows * ncols, stream); raft::interruptible::synchronize(stream); std::vector out(nrows * nkeys, T(0)); - for (uint32_t i = 0; i < nrows; ++i) { - for (uint32_t j = 0; j < ncols; ++j) { + for (IdxT i = 0; i < nrows; ++i) { + for (IdxT j = 0; j < ncols; ++j) { out[i * nkeys + h_keys[j]] += h_in[i * ncols + j]; } } @@ -48,29 +49,31 @@ void naiveReduceColsByKey(const T* in, raft::interruptible::synchronize(stream); } -template +template struct ReduceColsInputs { T tolerance; - uint32_t rows; - uint32_t cols; - uint32_t nkeys; + IdxT rows; + IdxT cols; + IdxT nkeys; unsigned long long int seed; }; -template -::std::ostream& operator<<(::std::ostream& os, const ReduceColsInputs& dims) +template +::std::ostream& operator<<(::std::ostream& os, const ReduceColsInputs& p) { + os << "{" << p.tolerance << "," << p.rows << "," << p.cols << "," << p.nkeys << "," << p.seed + << "}"; return os; } -template -class ReduceColsTest : public ::testing::TestWithParam> { +template +class ReduceColsTest : public ::testing::TestWithParam> { protected: ReduceColsTest() : in(0, stream), out_ref(0, stream), out(0, stream), keys(0, stream) {} void SetUp() override { - params = ::testing::TestWithParam>::GetParam(); + params = ::testing::TestWithParam>::GetParam(); raft::random::RngState r(params.seed); raft::handle_t handle; auto stream = handle.get_stream(); @@ -82,45 +85,53 @@ class ReduceColsTest : public ::testing::TestWithParam> { out_ref.resize(nrows * nkeys, stream); out.resize(nrows * nkeys, stream); uniform(handle, r, in.data(), nrows * ncols, T(-1.0), T(1.0)); - uniformInt(handle, r, keys.data(), ncols, 0u, params.nkeys); + uniformInt(handle, r, keys.data(), ncols, KeyT{0}, static_cast(params.nkeys)); naiveReduceColsByKey(in.data(), keys.data(), out_ref.data(), nrows, ncols, nkeys, stream); auto input_view = raft::make_device_matrix_view(in.data(), nrows, ncols); auto output_view = raft::make_device_matrix_view(out.data(), nrows, nkeys); - auto keys_view = raft::make_device_vector_view(keys.data(), ncols); + auto keys_view = raft::make_device_vector_view(keys.data(), ncols); reduce_cols_by_key(handle, input_view, keys_view, output_view, nkeys); raft::interruptible::synchronize(stream); } protected: cudaStream_t stream = 0; - ReduceColsInputs params; + ReduceColsInputs params; rmm::device_uvector in, out_ref, out; - rmm::device_uvector keys; + rmm::device_uvector keys; }; -const std::vector> inputsf = {{0.0001f, 128, 32, 6, 1234ULL}, - {0.0005f, 121, 63, 10, 1234ULL}}; -typedef ReduceColsTest ReduceColsTestF; -TEST_P(ReduceColsTestF, Result) -{ - ASSERT_TRUE(raft::devArrMatch(out_ref.data(), - out.data(), - params.rows * params.nkeys, - raft::CompareApprox(params.tolerance))); -} -INSTANTIATE_TEST_CASE_P(ReduceColsTests, ReduceColsTestF, ::testing::ValuesIn(inputsf)); +#define RCBK_TEST(test_type, test_name, test_inputs) \ + typedef RAFT_DEPAREN(test_type) test_name; \ + TEST_P(test_name, Result) \ + { \ + ASSERT_TRUE(raft::devArrMatch(out_ref.data(), \ + out.data(), \ + params.rows* params.nkeys, \ + raft::CompareApprox(params.tolerance))); \ + } \ + INSTANTIATE_TEST_CASE_P(ReduceColsTests, test_name, ::testing::ValuesIn(test_inputs)) -const std::vector> inputsd2 = {{0.0000001, 128, 32, 6, 1234ULL}, - {0.0000001, 121, 63, 10, 1234ULL}}; -typedef ReduceColsTest ReduceColsTestD; -TEST_P(ReduceColsTestD, Result) -{ - ASSERT_TRUE(raft::devArrMatch(out_ref.data(), - out.data(), - params.rows * params.nkeys, - raft::CompareApprox(params.tolerance))); -} -INSTANTIATE_TEST_CASE_P(ReduceColsTests, ReduceColsTestD, ::testing::ValuesIn(inputsd2)); +const std::vector> inputsf_i32 = + raft::util::itertools::product>( + {0.001f}, {1, 9, 63, 1024}, {1234, 9999, 101010}, {7, 42, 127, 515, 2022}, {1234ULL}); +const std::vector> inputsd_i32 = + raft::util::itertools::product>( + {0.000001}, {1, 9, 63, 1024}, {1234, 9999, 101010}, {7, 42, 127, 515, 2022}, {1234ULL}); +const std::vector> inputsf_u32 = + raft::util::itertools::product>({0.001f}, + {1u, 9u, 63u, 1024u}, + {1234u, 9999u, 101010u}, + {7u, 42u, 127u, 515u, 2022u}, + {1234ULL}); +const std::vector> inputsf_i64 = + raft::util::itertools::product>( + {0.001f}, {1, 9, 63, 1024}, {1234, 9999, 101010}, {7, 42, 127, 515, 2022}, {1234ULL}); + +RCBK_TEST((ReduceColsTest), ReduceColsTestFU32I32, inputsf_i32); +RCBK_TEST((ReduceColsTest), ReduceColsTestDU32I32, inputsd_i32); +RCBK_TEST((ReduceColsTest), ReduceColsTestFI32U32, inputsf_u32); +RCBK_TEST((ReduceColsTest), ReduceColsTestFI32I64, inputsf_i64); } // end namespace linalg } // end namespace raft