Skip to content

Commit

Permalink
Merge branch 'branch-24.06' into fea-ann-bench-optional-gpu-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
achirkin authored May 15, 2024
2 parents a15550c + 92d4301 commit c27564e
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 106 deletions.
5 changes: 4 additions & 1 deletion cpp/bench/ann/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ if(RAFT_ANN_BENCH_USE_RAFT_CAGRA)
RAFT_CAGRA
PATH
bench/ann/src/raft/raft_benchmark.cu
$<$<BOOL:${RAFT_ANN_BENCH_USE_RAFT_CAGRA}>:bench/ann/src/raft/raft_cagra.cu>
$<$<BOOL:${RAFT_ANN_BENCH_USE_RAFT_CAGRA}>:bench/ann/src/raft/raft_cagra_float.cu>
$<$<BOOL:${RAFT_ANN_BENCH_USE_RAFT_CAGRA}>:bench/ann/src/raft/raft_cagra_half.cu>
$<$<BOOL:${RAFT_ANN_BENCH_USE_RAFT_CAGRA}>:bench/ann/src/raft/raft_cagra_int8_t.cu>
$<$<BOOL:${RAFT_ANN_BENCH_USE_RAFT_CAGRA}>:bench/ann/src/raft/raft_cagra_uint8_t.cu>
LINKS
raft::compiled
)
Expand Down
35 changes: 21 additions & 14 deletions cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,16 @@ void bench_search(::benchmark::State& state,
/**
* Each thread will manage its own outputs
*/
std::shared_ptr<buf<float>> distances =
std::make_shared<buf<float>>(current_algo_props->query_memory_type, k * query_set_size);
std::shared_ptr<buf<std::size_t>> neighbors =
std::make_shared<buf<std::size_t>>(current_algo_props->query_memory_type, k * query_set_size);
using index_type = size_t;
constexpr size_t kAlignResultBuf = 64;
size_t result_elem_count = k * query_set_size;
result_elem_count =
((result_elem_count + kAlignResultBuf - 1) / kAlignResultBuf) * kAlignResultBuf;
auto& result_buf =
get_result_buffer_from_global_pool(result_elem_count * (sizeof(float) + sizeof(index_type)));
auto* neighbors_ptr =
reinterpret_cast<index_type*>(result_buf.data(current_algo_props->query_memory_type));
auto* distances_ptr = reinterpret_cast<float*>(neighbors_ptr + result_elem_count);

{
nvtx_case nvtx{state.name()};
Expand All @@ -305,8 +311,8 @@ void bench_search(::benchmark::State& state,
algo->search(query_set + batch_offset * dataset->dim(),
n_queries,
k,
neighbors->data + out_offset * k,
distances->data + out_offset * k);
neighbors_ptr + out_offset * k,
distances_ptr + out_offset * k);
} catch (const std::exception& e) {
state.SkipWithError("Benchmark loop: " + std::string(e.what()));
break;
Expand Down Expand Up @@ -338,12 +344,13 @@ void bench_search(::benchmark::State& state,
// Each thread calculates recall on their partition of queries.
// evaluate recall
if (dataset->max_k() >= k) {
const std::int32_t* gt = dataset->gt_set();
const std::uint32_t max_k = dataset->max_k();
buf<std::size_t> neighbors_host = neighbors->move(MemoryType::Host);
std::size_t rows = std::min(queries_processed, query_set_size);
std::size_t match_count = 0;
std::size_t total_count = rows * static_cast<size_t>(k);
const std::int32_t* gt = dataset->gt_set();
const std::uint32_t max_k = dataset->max_k();
result_buf.transfer_data(MemoryType::Host, current_algo_props->query_memory_type);
auto* neighbors_host = reinterpret_cast<index_type*>(result_buf.data(MemoryType::Host));
std::size_t rows = std::min(queries_processed, query_set_size);
std::size_t match_count = 0;
std::size_t total_count = rows * static_cast<size_t>(k);

// We go through the groundtruth with same stride as the benchmark loop.
size_t out_offset = 0;
Expand All @@ -354,7 +361,7 @@ void bench_search(::benchmark::State& state,
size_t i_out_idx = out_offset + i;
if (i_out_idx < rows) {
for (std::uint32_t j = 0; j < k; j++) {
auto act_idx = std::int32_t(neighbors_host.data[i_out_idx * k + j]);
auto act_idx = std::int32_t(neighbors_host[i_out_idx * k + j]);
for (std::uint32_t l = 0; l < k; l++) {
auto exp_idx = gt[i_orig_idx * max_k + l];
if (act_idx == exp_idx) {
Expand Down Expand Up @@ -717,7 +724,7 @@ inline auto run_main(int argc, char** argv) -> int
// to a shared library it depends on (dynamic benchmark executable).
current_algo.reset();
current_algo_props.reset();
reset_global_stream_pool();
reset_global_device_resources();
return 0;
}
}; // namespace raft::bench::ann
141 changes: 88 additions & 53 deletions cpp/bench/ann/src/common/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,57 +56,6 @@ inline thread_local int benchmark_thread_id = 0;
*/
inline thread_local int benchmark_n_threads = 1;

template <typename T>
struct buf {
MemoryType memory_type;
std::size_t size;
T* data;
buf(MemoryType memory_type, std::size_t size)
: memory_type(memory_type), size(size), data(nullptr)
{
switch (memory_type) {
#ifndef BUILD_CPU_ONLY
case MemoryType::Device: {
cudaMalloc(reinterpret_cast<void**>(&data), size * sizeof(T));
cudaMemset(data, 0, size * sizeof(T));
} break;
#endif
default: {
data = reinterpret_cast<T*>(malloc(size * sizeof(T)));
std::memset(data, 0, size * sizeof(T));
}
}
}
~buf() noexcept
{
if (data == nullptr) { return; }
switch (memory_type) {
#ifndef BUILD_CPU_ONLY
case MemoryType::Device: {
cudaFree(data);
} break;
#endif
default: {
free(data);
}
}
}

[[nodiscard]] auto move(MemoryType target_memory_type) -> buf<T>
{
buf<T> r{target_memory_type, size};
#ifndef BUILD_CPU_ONLY
if ((memory_type == MemoryType::Device && target_memory_type != MemoryType::Device) ||
(memory_type != MemoryType::Device && target_memory_type == MemoryType::Device)) {
cudaMemcpy(r.data, data, size * sizeof(T), cudaMemcpyDefault);
return r;
}
#endif
std::swap(data, r.data);
return r;
}
};

struct cuda_timer {
private:
std::optional<cudaStream_t> stream_;
Expand Down Expand Up @@ -244,16 +193,102 @@ inline auto get_stream_from_global_pool() -> cudaStream_t
#endif
}

struct result_buffer {
explicit result_buffer(size_t size, cudaStream_t stream) : size_{size}, stream_{stream}
{
if (size_ == 0) { return; }
data_host_ = malloc(size_);
#ifndef BUILD_CPU_ONLY
cudaMallocAsync(&data_device_, size_, stream_);
cudaStreamSynchronize(stream_);
#endif
}
result_buffer() = delete;
result_buffer(result_buffer&&) = delete;
result_buffer& operator=(result_buffer&&) = delete;
result_buffer(const result_buffer&) = delete;
result_buffer& operator=(const result_buffer&) = delete;
~result_buffer() noexcept
{
if (size_ == 0) { return; }
#ifndef BUILD_CPU_ONLY
cudaFreeAsync(data_device_, stream_);
cudaStreamSynchronize(stream_);
#endif
free(data_host_);
}

[[nodiscard]] auto size() const noexcept { return size_; }
[[nodiscard]] auto data(ann::MemoryType loc) const noexcept
{
switch (loc) {
case MemoryType::Device: return data_device_;
default: return data_host_;
}
}

void transfer_data(ann::MemoryType dst, ann::MemoryType src)
{
auto dst_ptr = data(dst);
auto src_ptr = data(src);
if (dst_ptr == src_ptr) { return; }
#ifndef BUILD_CPU_ONLY
cudaMemcpyAsync(dst_ptr, src_ptr, size_, cudaMemcpyDefault, stream_);
cudaStreamSynchronize(stream_);
#endif
}

private:
size_t size_{0};
cudaStream_t stream_ = nullptr;
void* data_host_ = nullptr;
void* data_device_ = nullptr;
};

namespace detail {
inline std::vector<std::unique_ptr<result_buffer>> global_result_buffer_pool(0);
inline std::mutex grp_mutex;
} // namespace detail

/**
* Get a result buffer associated with the current benchmark thread.
*
* Note, the allocations are reused between the benchmark cases.
* This reduces the setup overhead and number of times the context is being blocked
* (this is relevant if there is a persistent kernel running across multiples benchmark cases).
*/
inline auto get_result_buffer_from_global_pool(size_t size) -> result_buffer&
{
auto stream = get_stream_from_global_pool();
auto& rb = [stream, size]() -> result_buffer& {
std::lock_guard guard(detail::grp_mutex);
if (static_cast<int>(detail::global_result_buffer_pool.size()) < benchmark_n_threads) {
detail::global_result_buffer_pool.resize(benchmark_n_threads);
}
auto& rb = detail::global_result_buffer_pool[benchmark_thread_id];
if (!rb || rb->size() < size) { rb = std::make_unique<result_buffer>(size, stream); }
return *rb;
}();

memset(rb.data(MemoryType::Host), 0, size);
#ifndef BUILD_CPU_ONLY
cudaMemsetAsync(rb.data(MemoryType::Device), 0, size, stream);
cudaStreamSynchronize(stream);
#endif
return rb;
}

/**
* Delete all streams in the global pool.
* Delete all streams and memory allocations in the global pool.
* It's called at the end of the `main` function - before global/static variables and cuda context
* is destroyed - to make sure they are destroyed gracefully and correctly seen by analysis tools
* such as nsys.
*/
inline void reset_global_stream_pool()
inline void reset_global_device_resources()
{
#ifndef BUILD_CPU_ONLY
std::lock_guard guard(detail::gsp_mutex);
detail::global_result_buffer_pool.resize(0);
detail::global_stream_pool.resize(0);
#endif
}
Expand Down
22 changes: 14 additions & 8 deletions cpp/bench/ann/src/raft/raft_ann_bench_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ class configured_raft_resources {
* It's used by the copy constructor.
*/
explicit configured_raft_resources(const std::shared_ptr<shared_raft_resources>& shared_res)
: shared_res_{shared_res}, res_{rmm::cuda_stream_view(get_stream_from_global_pool())}
: shared_res_{shared_res},
res_{std::make_unique<raft::device_resources>(
rmm::cuda_stream_view(get_stream_from_global_pool()))}
{
}

Expand All @@ -131,9 +133,9 @@ class configured_raft_resources {
{
}

configured_raft_resources(configured_raft_resources&&) = delete;
configured_raft_resources& operator=(configured_raft_resources&&) = delete;
~configured_raft_resources() = default;
configured_raft_resources(configured_raft_resources&&);
configured_raft_resources& operator=(configured_raft_resources&&);
~configured_raft_resources() = default;
configured_raft_resources(const configured_raft_resources& res)
: configured_raft_resources{res.shared_res_}
{
Expand All @@ -144,11 +146,11 @@ class configured_raft_resources {
return *this;
}

operator raft::resources&() noexcept { return res_; }
operator const raft::resources&() const noexcept { return res_; }
operator raft::resources&() noexcept { return *res_; }
operator const raft::resources&() const noexcept { return *res_; }

/** Get the main stream */
[[nodiscard]] auto get_sync_stream() const noexcept { return resource::get_cuda_stream(res_); }
[[nodiscard]] auto get_sync_stream() const noexcept { return resource::get_cuda_stream(*res_); }

private:
/** The resources shared among multiple raft handles / threads. */
Expand All @@ -157,7 +159,11 @@ class configured_raft_resources {
* Until we make the use of copies of raft::resources thread-safe, each benchmark wrapper must
* have its own copy of it.
*/
raft::device_resources res_;
std::unique_ptr<raft::device_resources> res_ = std::make_unique<raft::device_resources>();
};

inline configured_raft_resources::configured_raft_resources(configured_raft_resources&&) = default;
inline configured_raft_resources& configured_raft_resources::operator=(
configured_raft_resources&&) = default;

} // namespace raft::bench::ann
20 changes: 20 additions & 0 deletions cpp/bench/ann/src/raft/raft_cagra_float.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "raft_cagra_wrapper.h"

namespace raft::bench::ann {
template class RaftCagra<float, uint32_t>;
} // namespace raft::bench::ann
20 changes: 20 additions & 0 deletions cpp/bench/ann/src/raft/raft_cagra_half.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "raft_cagra_wrapper.h"

namespace raft::bench::ann {
template class RaftCagra<half, uint32_t>;
} // namespace raft::bench::ann
20 changes: 20 additions & 0 deletions cpp/bench/ann/src/raft/raft_cagra_int8_t.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "raft_cagra_wrapper.h"

namespace raft::bench::ann {
template class RaftCagra<int8_t, uint32_t>;
} // namespace raft::bench::ann
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,4 @@

namespace raft::bench::ann {
template class RaftCagra<uint8_t, uint32_t>;
template class RaftCagra<int8_t, uint32_t>;
template class RaftCagra<half, uint32_t>;
template class RaftCagra<float, uint32_t>;
} // namespace raft::bench::ann
Loading

0 comments on commit c27564e

Please sign in to comment.