diff --git a/cpp/bench/ann/src/common/ann_types.hpp b/cpp/bench/ann/src/common/ann_types.hpp index 9b77c9df91..c6213059dc 100644 --- a/cpp/bench/ann/src/common/ann_types.hpp +++ b/cpp/bench/ann/src/common/ann_types.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -74,13 +74,33 @@ struct AlgoProperty { class AnnBase { public: inline AnnBase(Metric metric, int dim) : metric_(metric), dim_(dim) {} - virtual ~AnnBase() = default; + virtual ~AnnBase() noexcept = default; protected: Metric metric_; int dim_; }; +/** + * The GPU-based algorithms, which do not perform CPU synchronization at the end of their build or + * search methods, must implement this interface. + * + * The `cuda_timer` / `cuda_lap` from `util.hpp` uses this stream to record GPU times with events + * and, if necessary, also synchronize (via events) between iterations. + * + * If the algo does not implement this interface, GPU timings are disabled. + */ +class AnnGPU { + public: + /** + * Return the main cuda stream for this algorithm. + * If any work is done in multiple streams, they should synchornize with the main stream at the + * end. + */ + [[nodiscard]] virtual auto get_sync_stream() const noexcept -> cudaStream_t = 0; + virtual ~AnnGPU() noexcept = default; +}; + template class ANN : public AnnBase { public: @@ -91,18 +111,15 @@ class ANN : public AnnBase { }; inline ANN(Metric metric, int dim) : AnnBase(metric, dim) {} + virtual ~ANN() noexcept override = default; - virtual void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) = 0; + virtual void build(const T* dataset, size_t nrow) = 0; virtual void set_search_param(const AnnSearchParam& param) = 0; // TODO: this assumes that an algorithm can always return k results. // This is not always possible. - virtual void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const = 0; + virtual void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const = 0; virtual void save(const std::string& file) const = 0; virtual void load(const std::string& file) = 0; diff --git a/cpp/bench/ann/src/common/benchmark.hpp b/cpp/bench/ann/src/common/benchmark.hpp index 53f31d6232..851993712c 100644 --- a/cpp/bench/ann/src/common/benchmark.hpp +++ b/cpp/bench/ann/src/common/benchmark.hpp @@ -24,10 +24,8 @@ #include #include -#include #include #include -#include #include #include #include @@ -38,11 +36,8 @@ #include #include #include -namespace raft::bench::ann { -std::mutex init_mutex; -std::condition_variable cond_var; -std::atomic_int processed_threads{0}; +namespace raft::bench::ann { static inline std::unique_ptr current_algo{nullptr}; static inline std::unique_ptr current_algo_props{nullptr}; @@ -126,6 +121,9 @@ void bench_build(::benchmark::State& state, Configuration::Index index, bool force_overwrite) { + // NB: these two thread-local vars can be used within algo wrappers + raft::bench::ann::benchmark_thread_id = state.thread_index(); + raft::bench::ann::benchmark_n_threads = state.threads(); dump_parameters(state, index.build_param); if (file_exists(index.file)) { if (force_overwrite) { @@ -149,21 +147,23 @@ void bench_build(::benchmark::State& state, const T* base_set = dataset->base_set(algo_property.dataset_memory_type); std::size_t index_size = dataset->base_set_size(); - cuda_timer gpu_timer; + cuda_timer gpu_timer{algo}; { nvtx_case nvtx{state.name()}; for (auto _ : state) { [[maybe_unused]] auto ntx_lap = nvtx.lap(); [[maybe_unused]] auto gpu_lap = gpu_timer.lap(); try { - algo->build(base_set, index_size, gpu_timer.stream()); + algo->build(base_set, index_size); } catch (const std::exception& e) { state.SkipWithError(std::string(e.what())); } } } - state.counters.insert( - {{"GPU", gpu_timer.total_time() / state.iterations()}, {"index_size", index_size}}); + if (gpu_timer.active()) { + state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}}); + } + state.counters.insert({{"index_size", index_size}}); if (state.skipped()) { return; } make_sure_parent_dir_exists(index.file); @@ -177,7 +177,10 @@ void bench_search(::benchmark::State& state, std::shared_ptr> dataset, Objective metric_objective) { - std::size_t queries_processed = 0; + // NB: these two thread-local vars can be used within algo wrappers + raft::bench::ann::benchmark_thread_id = state.thread_index(); + raft::bench::ann::benchmark_n_threads = state.threads(); + std::size_t queries_processed = 0; const auto& sp_json = index.search_params[search_param_ix]; @@ -194,7 +197,8 @@ void bench_search(::benchmark::State& state, std::stringstream msg; msg << "Not enough queries in benchmark set. Expected " << n_queries << ", actual " << dataset->query_set_size(); - return state.SkipWithError(msg.str()); + state.SkipWithError(msg.str()); + return; } // Each thread start from a different offset, so that the queries that they process do not @@ -214,9 +218,8 @@ void bench_search(::benchmark::State& state, /** * Make sure the first thread loads the algo and dataset */ - if (state.thread_index() == 0) { - std::unique_lock lk(init_mutex); - cond_var.wait(lk, [] { return processed_threads.load(std::memory_order_acquire) == 0; }); + progress_barrier load_barrier{}; + if (load_barrier.arrive(1) == 0) { // algo is static to cache it between close search runs to save time on index loading static std::string index_file = ""; if (index.file != index_file) { @@ -257,21 +260,16 @@ void bench_search(::benchmark::State& state, } try { algo->set_search_param(*search_param); - } catch (const std::exception& ex) { state.SkipWithError("An error occurred setting search parameters: " + std::string(ex.what())); return; } query_set = dataset->query_set(current_algo_props->query_memory_type); - processed_threads.store(state.threads(), std::memory_order_acq_rel); - cond_var.notify_all(); + load_barrier.arrive(state.threads()); } else { - std::unique_lock lk(init_mutex); // All other threads will wait for the first thread to initialize the algo. - cond_var.wait(lk, [&state] { - return processed_threads.load(std::memory_order_acquire) == state.threads(); - }); + load_barrier.wait(state.threads() * 2); // gbench ensures that all threads are synchronized at the start of the benchmark loop. // We are accessing shared variables (like current_algo, current_algo_probs) before the // benchmark loop, therefore the synchronization here is necessary. @@ -286,26 +284,31 @@ void bench_search(::benchmark::State& state, std::shared_ptr> neighbors = std::make_shared>(current_algo_props->query_memory_type, k * query_set_size); - cuda_timer gpu_timer; { nvtx_case nvtx{state.name()}; - auto algo = dynamic_cast*>(current_algo.get())->copy(); + std::unique_ptr> algo{nullptr}; + try { + dynamic_cast*>(current_algo.get())->copy().swap(algo); + } catch (const std::exception& e) { + state.SkipWithError("Algo::copy: " + std::string(e.what())); + return; + } + // Initialize with algo, so that the timer.lap() object can sync with algo::get_sync_stream() + cuda_timer gpu_timer{algo}; auto start = std::chrono::high_resolution_clock::now(); for (auto _ : state) { [[maybe_unused]] auto ntx_lap = nvtx.lap(); [[maybe_unused]] auto gpu_lap = gpu_timer.lap(); - - // run the search try { algo->search(query_set + batch_offset * dataset->dim(), n_queries, k, neighbors->data + out_offset * k, - distances->data + out_offset * k, - gpu_timer.stream()); + distances->data + out_offset * k); } catch (const std::exception& e) { - state.SkipWithError(std::string(e.what())); + state.SkipWithError("Benchmark loop: " + std::string(e.what())); + break; } // advance to the next batch @@ -318,22 +321,19 @@ void bench_search(::benchmark::State& state, auto duration = std::chrono::duration_cast>(end - start).count(); if (state.thread_index() == 0) { state.counters.insert({{"end_to_end", duration}}); } state.counters.insert({"Latency", {duration, benchmark::Counter::kAvgIterations}}); + + if (gpu_timer.active()) { + state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}}); + } } state.SetItemsProcessed(queries_processed); - if (cudart.found()) { - state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}}); - } // This will be the total number of queries across all threads state.counters.insert({{"total_queries", queries_processed}}); if (state.skipped()) { return; } - // assume thread has finished processing successfully at this point - // last thread to finish processing notifies all - if (processed_threads-- == 0) { cond_var.notify_all(); } - // Each thread calculates recall on their partition of queries. // evaluate recall if (dataset->max_k() >= k) { @@ -673,6 +673,16 @@ inline auto run_main(int argc, char** argv) -> int override_kv, metric_objective, threads); + } else if (dtype == "half") { + dispatch_benchmark(conf, + force_overwrite, + build_mode, + search_mode, + data_prefix, + index_prefix, + override_kv, + metric_objective, + threads); } else if (dtype == "uint8") { dispatch_benchmark(conf, force_overwrite, @@ -705,6 +715,8 @@ inline auto run_main(int argc, char** argv) -> int // Release a possibly cached ANN object, so that it cannot be alive longer than the handle // to a shared library it depends on (dynamic benchmark executable). current_algo.reset(); + current_algo_props.reset(); + reset_global_stream_pool(); return 0; } }; // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/common/conf.hpp b/cpp/bench/ann/src/common/conf.hpp index 405b00a74e..92ba86c6cf 100644 --- a/cpp/bench/ann/src/common/conf.hpp +++ b/cpp/bench/ann/src/common/conf.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,11 +91,16 @@ class Configuration { dataset_conf_.dtype = conf.at("dtype"); } else { auto filename = dataset_conf_.base_file; - if (!filename.compare(filename.size() - 4, 4, "fbin")) { + if (filename.size() > 6 && filename.compare(filename.size() - 6, 6, "f16bin") == 0) { + dataset_conf_.dtype = "half"; + } else if (filename.size() > 9 && + filename.compare(filename.size() - 9, 9, "fp16.fbin") == 0) { + dataset_conf_.dtype = "half"; + } else if (filename.size() > 4 && filename.compare(filename.size() - 4, 4, "fbin") == 0) { dataset_conf_.dtype = "float"; - } else if (!filename.compare(filename.size() - 5, 5, "u8bin")) { + } else if (filename.size() > 5 && filename.compare(filename.size() - 5, 5, "u8bin") == 0) { dataset_conf_.dtype = "uint8"; - } else if (!filename.compare(filename.size() - 5, 5, "i8bin")) { + } else if (filename.size() > 5 && filename.compare(filename.size() - 5, 5, "i8bin") == 0) { dataset_conf_.dtype = "int8"; } else { log_error("Could not determine data type of the dataset %s", filename.c_str()); diff --git a/cpp/bench/ann/src/common/cuda_stub.hpp b/cpp/bench/ann/src/common/cuda_stub.hpp index 6e3b63cd38..512ccbce34 100644 --- a/cpp/bench/ann/src/common/cuda_stub.hpp +++ b/cpp/bench/ann/src/common/cuda_stub.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,14 +33,17 @@ ______________________________________________________________________________ */ #ifndef BUILD_CPU_ONLY +#include #include #ifdef ANN_BENCH_LINK_CUDART #include #include #endif #else +#include typedef void* cudaStream_t; typedef void* cudaEvent_t; +typedef uint16_t half; #endif namespace raft::bench::ann { diff --git a/cpp/bench/ann/src/common/dataset.hpp b/cpp/bench/ann/src/common/dataset.hpp index ccc5915b3c..8fcff77d3c 100644 --- a/cpp/bench/ann/src/common/dataset.hpp +++ b/cpp/bench/ann/src/common/dataset.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,6 @@ #include "util.hpp" -#ifndef BUILD_CPU_ONLY -#include -#else -typedef uint16_t half; -#endif - #include #include #include @@ -222,7 +216,7 @@ void BinFile::check_suffix_() throw std::runtime_error("BinFile should has .fbin suffix: " + file_); } } else if constexpr (std::is_same_v) { - if (suffix != "f16bin") { + if (suffix != "f16bin" && suffix != "fbin") { throw std::runtime_error("BinFile should has .f16bin suffix: " + file_); } } else if constexpr (std::is_same_v) { diff --git a/cpp/bench/ann/src/common/util.hpp b/cpp/bench/ann/src/common/util.hpp index e9e4a9ad21..8f6468befa 100644 --- a/cpp/bench/ann/src/common/util.hpp +++ b/cpp/bench/ann/src/common/util.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,15 +29,34 @@ #include #include #include +#include #include #include #include +#include +#include #include #include +#include namespace raft::bench::ann { +/** + * Current thread id as given by the benchmark State. + * It's populated on every call of a benchmark case. + * It's relevant in the 'throughput' mode of the search benchmarks, + * where some algorithms might want to coordinate allocation of the resources. + */ +inline thread_local int benchmark_thread_id = 0; +/** + * Total concurrent thread count as given by the benchmark State. + * It's populated on every call of a benchmark case. + * It's relevant in the 'throughput' mode of the search benchmarks, + * where some algorithms might want to coordinate allocation of the resources. + */ +inline thread_local int benchmark_n_threads = 1; + template struct buf { MemoryType memory_type; @@ -91,11 +110,19 @@ struct buf { struct cuda_timer { private: - cudaStream_t stream_{nullptr}; + std::optional stream_; cudaEvent_t start_{nullptr}; cudaEvent_t stop_{nullptr}; double total_time_{0}; + template + static inline auto extract_stream(AnnT* algo) -> std::optional + { + auto gpu_ann = dynamic_cast(algo); + if (gpu_ann != nullptr) { return std::make_optional(gpu_ann->get_sync_stream()); } + return std::nullopt; + } + public: struct cuda_lap { private: @@ -109,7 +136,6 @@ struct cuda_timer { : start_(start), stop_(stop), stream_(stream), total_time_(total_time) { #ifndef BUILD_CPU_ONLY - cudaStreamSynchronize(stream_); cudaEventRecord(start_, stream_); #endif } @@ -127,34 +153,110 @@ struct cuda_timer { } }; - cuda_timer() + explicit cuda_timer(std::optional stream) : stream_{stream} { #ifndef BUILD_CPU_ONLY - cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking); - cudaEventCreate(&stop_); - cudaEventCreate(&start_); + if (stream_.has_value()) { + cudaEventCreate(&stop_); + cudaEventCreate(&start_); + } #endif } + template + explicit cuda_timer(const std::unique_ptr& algo) : cuda_timer{extract_stream(algo.get())} + { + } + ~cuda_timer() noexcept { #ifndef BUILD_CPU_ONLY - cudaEventDestroy(start_); - cudaEventDestroy(stop_); - cudaStreamDestroy(stream_); + if (stream_.has_value()) { + cudaStreamSynchronize(stream_.value()); + cudaEventDestroy(start_); + cudaEventDestroy(stop_); + } #endif } - [[nodiscard]] auto stream() const -> cudaStream_t { return stream_; } + cuda_timer() = delete; + cuda_timer(cuda_timer const&) = delete; + cuda_timer(cuda_timer&&) = delete; + auto operator=(cuda_timer const&) -> cuda_timer& = delete; + auto operator=(cuda_timer&&) -> cuda_timer& = delete; + + [[nodiscard]] auto stream() const -> std::optional { return stream_; } + + [[nodiscard]] auto active() const -> bool { return stream_.has_value(); } [[nodiscard]] auto total_time() const -> double { return total_time_; } - [[nodiscard]] auto lap() -> cuda_timer::cuda_lap + [[nodiscard]] auto lap(bool enabled = true) -> std::optional { - return cuda_lap{stream_, start_, stop_, total_time_}; + return enabled && stream_.has_value() + ? std::make_optional(stream_.value(), start_, stop_, total_time_) + : std::nullopt; } }; +#ifndef BUILD_CPU_ONLY +// ATM, rmm::stream does not support passing in flags; hence this helper type. +struct non_blocking_stream { + non_blocking_stream() { cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking); } + ~non_blocking_stream() noexcept + { + if (stream_ != nullptr) { cudaStreamDestroy(stream_); } + } + non_blocking_stream(non_blocking_stream const&) = delete; + non_blocking_stream(non_blocking_stream&& other) noexcept { std::swap(stream_, other.stream_); } + auto operator=(non_blocking_stream const&) -> non_blocking_stream& = delete; + auto operator=(non_blocking_stream&&) -> non_blocking_stream& = delete; + [[nodiscard]] auto view() const noexcept -> cudaStream_t { return stream_; } + + private: + cudaStream_t stream_{nullptr}; +}; + +namespace detail { +inline std::vector global_stream_pool(0); +inline std::mutex gsp_mutex; +} // namespace detail +#endif + +/** + * Get a stream associated with the current benchmark thread. + * + * Note, the streams are reused between the benchmark cases. + * This makes it easier to profile and analyse multiple benchmark cases in one timeline using tools + * like nsys. + */ +inline auto get_stream_from_global_pool() -> cudaStream_t +{ +#ifndef BUILD_CPU_ONLY + std::lock_guard guard(detail::gsp_mutex); + if (int(detail::global_stream_pool.size()) < benchmark_n_threads) { + detail::global_stream_pool.resize(benchmark_n_threads); + } + return detail::global_stream_pool[benchmark_thread_id].view(); +#else + return nullptr; +#endif +} + +/** + * Delete all streams in the global pool. + * It's called at the end of the `main` function - before global/static variables and cuda context + * is destroyed - to make sure they are destroyed gracefully and correctly seen by analysis tools + * such as nsys. + */ +inline void reset_global_stream_pool() +{ +#ifndef BUILD_CPU_ONLY + std::lock_guard guard(detail::gsp_mutex); + detail::global_stream_pool.resize(0); +#endif +} + inline auto cuda_info() { std::vector> props; @@ -253,6 +355,77 @@ struct nvtx_case { } }; +/** + * A progress tracker that allows syncing threads multiple times and resets the global + * progress once the threads are done. + */ +struct progress_barrier { + progress_barrier() = default; + ~progress_barrier() noexcept + { + { + // Lock makes sure the notified threads see the updates to `done_`. + std::unique_lock lk(mutex_); + done_.store(true, std::memory_order_relaxed); + cv_.notify_all(); + } + // This is the only place where the order of the updates to thread_progress_ and done_ is + // important. They are not guarded by the mutex, and `done_` must not be reset to `true` by + // other threads after the `total_progress_` is zero. + // Hence the default memory order (std::memory_order_seq_cst). + auto rem = total_progress_.fetch_sub(thread_progress_); + if (rem == thread_progress_) { + // the last thread to exit clears the progress state. + done_.store(false); + } + } + + /** + * Advance the progress counter by `n` and return the previous `progress` value. + * + * This can be used to track which thread arrives on the call site first. + * + * @return the previous progress counter value (before incrementing it by `n`). + */ + auto arrive(int n) + { + thread_progress_ += n; + // Lock makes sure the notified threads see the updates to `total_progress_`. + std::unique_lock lk(mutex_); + auto prev = total_progress_.fetch_add(n, std::memory_order_relaxed); + cv_.notify_all(); + return prev; + } + + /** + * Wait till the progress counter reaches `n` or finishes abnormally. + * + * @return the latest observed value of the progress counter. + */ + auto wait(int limit) + { + int cur = total_progress_.load(std::memory_order_relaxed); + if (cur >= limit) { return cur; } + auto done = done_.load(std::memory_order_relaxed); + if (done) { return cur; } + std::unique_lock lk(mutex_); + while (cur < limit && !done) { + using namespace std::chrono_literals; + cv_.wait_for(lk, 10ms); + cur = total_progress_.load(std::memory_order_relaxed); + done = done_.load(std::memory_order_relaxed); + } + return cur; + } + + private: + static inline std::atomic total_progress_; + static inline std::atomic done_; + static inline std::mutex mutex_; + static inline std::condition_variable cv_; + int thread_progress_{0}; +}; + inline std::vector split(const std::string& s, char delimiter) { std::vector tokens; diff --git a/cpp/bench/ann/src/faiss/faiss_cpu_wrapper.h b/cpp/bench/ann/src/faiss/faiss_cpu_wrapper.h index 3cc4e10b49..407f7148df 100644 --- a/cpp/bench/ann/src/faiss/faiss_cpu_wrapper.h +++ b/cpp/bench/ann/src/faiss/faiss_cpu_wrapper.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,7 +73,7 @@ class FaissCpu : public ANN { static_assert(std::is_same_v, "faiss support only float type"); } - void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) final; + void build(const T* dataset, size_t nrow) final; void set_search_param(const AnnSearchParam& param) override; @@ -88,12 +88,8 @@ class FaissCpu : public ANN { // TODO: if the number of results is less than k, the remaining elements of 'neighbors' // will be filled with (size_t)-1 - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const final; + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const final; AlgoProperty get_preference() const override { @@ -123,7 +119,7 @@ class FaissCpu : public ANN { }; template -void FaissCpu::build(const T* dataset, size_t nrow, cudaStream_t stream) +void FaissCpu::build(const T* dataset, size_t nrow) { auto index_ivf = dynamic_cast(index_.get()); if (index_ivf != nullptr) { @@ -172,12 +168,8 @@ void FaissCpu::set_search_param(const AnnSearchParam& param) } template -void FaissCpu::search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream) const +void FaissCpu::search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const { static_assert(sizeof(size_t) == sizeof(faiss::idx_t), "sizes of size_t and faiss::idx_t are different"); diff --git a/cpp/bench/ann/src/faiss/faiss_gpu_wrapper.h b/cpp/bench/ann/src/faiss/faiss_gpu_wrapper.h index 7879530753..633098fd1d 100644 --- a/cpp/bench/ann/src/faiss/faiss_gpu_wrapper.h +++ b/cpp/bench/ann/src/faiss/faiss_gpu_wrapper.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,9 +35,6 @@ #include #include -#include -#include - #include #include #include @@ -80,21 +77,8 @@ class OmpSingleThreadScope { namespace raft::bench::ann { -struct copyable_event { - copyable_event() { RAFT_CUDA_TRY(cudaEventCreate(&value_, cudaEventDisableTiming)); } - ~copyable_event() { RAFT_CUDA_TRY_NO_THROW(cudaEventDestroy(value_)); } - copyable_event(copyable_event&&) = default; - copyable_event& operator=(copyable_event&&) = default; - copyable_event(const copyable_event& res) : copyable_event{} {} - copyable_event& operator=(const copyable_event& other) = delete; - operator cudaEvent_t() const noexcept { return value_; } - - private: - cudaEvent_t value_{nullptr}; -}; - template -class FaissGpu : public ANN { +class FaissGpu : public ANN, public AnnGPU { public: using typename ANN::AnnSearchParam; struct SearchParam : public AnnSearchParam { @@ -119,7 +103,7 @@ class FaissGpu : public ANN { RAFT_CUDA_TRY(cudaGetDevice(&device_)); } - void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) final; + void build(const T* dataset, size_t nrow) final; virtual void set_search_param(const FaissGpu::AnnSearchParam& param) {} @@ -127,12 +111,13 @@ class FaissGpu : public ANN { // TODO: if the number of results is less than k, the remaining elements of 'neighbors' // will be filled with (size_t)-1 - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const final; + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const final; + + [[nodiscard]] auto get_sync_stream() const noexcept -> cudaStream_t override + { + return gpu_resource_->getDefaultStream(device_); + } AlgoProperty get_preference() const override { @@ -150,12 +135,6 @@ class FaissGpu : public ANN { template void load_(const std::string& file); - void stream_wait(cudaStream_t stream) const - { - RAFT_CUDA_TRY(cudaEventRecord(sync_, gpu_resource_->getDefaultStream(device_))); - RAFT_CUDA_TRY(cudaStreamWaitEvent(stream, sync_)); - } - /** [NOTE Multithreading] * * `gpu_resource_` is a shared resource: @@ -178,7 +157,6 @@ class FaissGpu : public ANN { faiss::MetricType metric_type_; int nlist_; int device_; - copyable_event sync_{}; double training_sample_fraction_; std::shared_ptr search_params_; const T* dataset_; @@ -186,7 +164,7 @@ class FaissGpu : public ANN { }; template -void FaissGpu::build(const T* dataset, size_t nrow, cudaStream_t stream) +void FaissGpu::build(const T* dataset, size_t nrow) { OmpSingleThreadScope omp_single_thread; auto index_ivf = dynamic_cast(index_.get()); @@ -214,16 +192,11 @@ void FaissGpu::build(const T* dataset, size_t nrow, cudaStream_t stream) index_->train(nrow, dataset); // faiss::gpu::GpuIndexFlat::train() will do nothing assert(index_->is_trained); index_->add(nrow, dataset); - stream_wait(stream); } template -void FaissGpu::search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream) const +void FaissGpu::search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const { static_assert(sizeof(size_t) == sizeof(faiss::idx_t), "sizes of size_t and faiss::idx_t are different"); @@ -246,7 +219,6 @@ void FaissGpu::search(const T* queries, reinterpret_cast(neighbors), this->search_params_.get()); } - stream_wait(stream); } template diff --git a/cpp/bench/ann/src/ggnn/ggnn_benchmark.cu b/cpp/bench/ann/src/ggnn/ggnn_benchmark.cu index 3b2e97062f..884476ea12 100644 --- a/cpp/bench/ann/src/ggnn/ggnn_benchmark.cu +++ b/cpp/bench/ann/src/ggnn/ggnn_benchmark.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -92,11 +92,10 @@ std::unique_ptr> create_algo(const std::string& algo, raft::bench::ann::Metric metric = parse_metric(distance); std::unique_ptr> ann; - if constexpr (std::is_same_v) {} - - if constexpr (std::is_same_v) {} - - if (algo == "ggnn") { ann = make_algo(metric, dim, conf); } + if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v) { + if (algo == "ggnn") { ann = make_algo(metric, dim, conf); } + } if (!ann) { throw std::runtime_error("invalid algo: '" + algo + "'"); } return ann; @@ -106,10 +105,13 @@ template std::unique_ptr::AnnSearchParam> create_search_param( const std::string& algo, const nlohmann::json& conf) { - if (algo == "ggnn") { - auto param = std::make_unique::SearchParam>(); - parse_search_param(conf, *param); - return param; + if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v) { + if (algo == "ggnn") { + auto param = std::make_unique::SearchParam>(); + parse_search_param(conf, *param); + return param; + } } // else throw std::runtime_error("invalid algo: '" + algo + "'"); diff --git a/cpp/bench/ann/src/ggnn/ggnn_wrapper.cuh b/cpp/bench/ann/src/ggnn/ggnn_wrapper.cuh index 20c50a5119..5f5013ef7f 100644 --- a/cpp/bench/ann/src/ggnn/ggnn_wrapper.cuh +++ b/cpp/bench/ann/src/ggnn/ggnn_wrapper.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ #pragma once #include "../common/ann_types.hpp" +#include "../common/util.hpp" #include #include @@ -30,7 +31,7 @@ template -class Ggnn : public ANN { +class Ggnn : public ANN, public AnnGPU { public: struct BuildParam { int k_build{24}; // KBuild @@ -53,20 +54,17 @@ class Ggnn : public ANN { Ggnn(Metric metric, int dim, const BuildParam& param); - void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) override - { - impl_->build(dataset, nrow, stream); - } + void build(const T* dataset, size_t nrow) override { impl_->build(dataset, nrow); } void set_search_param(const AnnSearchParam& param) override { impl_->set_search_param(param); } - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const override + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const override + { + impl_->search(queries, batch_size, k, neighbors, distances); + } + [[nodiscard]] auto get_sync_stream() const noexcept -> cudaStream_t override { - impl_->search(queries, batch_size, k, neighbors, distances, stream); + return dynamic_cast(impl_.get())->get_sync_stream(); } void save(const std::string& file) const override { impl_->save(file); } @@ -115,27 +113,27 @@ Ggnn::Ggnn(Metric metric, int dim, const BuildParam& param) : ANN(metric, } template -class GgnnImpl : public ANN { +class GgnnImpl : public ANN, public AnnGPU { public: using typename ANN::AnnSearchParam; GgnnImpl(Metric metric, int dim, const typename Ggnn::BuildParam& param); - void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) override; + void build(const T* dataset, size_t nrow) override; void set_search_param(const AnnSearchParam& param) override; - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const override; + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const override; + [[nodiscard]] auto get_sync_stream() const noexcept -> cudaStream_t override { return stream_; } void save(const std::string& file) const override; void load(const std::string& file) override; std::unique_ptr> copy() override { - return std::make_unique>(*this); + auto r = std::make_unique>(*this); + // set the thread-local stream to the copied handle. + r->stream_ = raft::bench::ann::get_stream_from_global_pool(); + return r; }; AlgoProperty get_preference() const override @@ -166,13 +164,40 @@ class GgnnImpl : public ANN { std::shared_ptr ggnn_; typename Ggnn::BuildParam build_param_; typename Ggnn::SearchParam search_param_; + cudaStream_t stream_; + const T* base_dataset = nullptr; + size_t base_n_rows = 0; + std::optional graph_file = std::nullopt; + + void load_impl() + { + if (base_dataset == nullptr) { return; } + if (base_n_rows == 0) { return; } + int device; + RAFT_CUDA_TRY(cudaGetDevice(&device)); + ggnn_ = std::make_shared( + device, base_n_rows, build_param_.num_layers, true, build_param_.tau); + ggnn_->set_base_data(base_dataset); + ggnn_->set_stream(get_sync_stream()); + if (graph_file.has_value()) { + auto& ggnn_host = ggnn_->ggnn_cpu_buffers.at(0); + auto& ggnn_device = ggnn_->ggnn_shards.at(0); + ggnn_->set_stream(get_sync_stream()); + + ggnn_host.load(graph_file.value()); + ggnn_host.uploadAsync(ggnn_device); + RAFT_CUDA_TRY(cudaStreamSynchronize(ggnn_device.stream)); + } + } }; template GgnnImpl::GgnnImpl(Metric metric, int dim, const typename Ggnn::BuildParam& param) - : ANN(metric, dim), build_param_(param) + : ANN(metric, dim), + build_param_(param), + stream_(raft::bench::ann::get_stream_from_global_pool()) { if (metric_ == Metric::kInnerProduct) { if (measure != Cosine) { throw std::runtime_error("mis-matched metric"); } @@ -187,17 +212,12 @@ GgnnImpl::GgnnImpl(Metric metric, } template -void GgnnImpl::build(const T* dataset, - size_t nrow, - cudaStream_t stream) +void GgnnImpl::build(const T* dataset, size_t nrow) { - int device; - RAFT_CUDA_TRY(cudaGetDevice(&device)); - ggnn_ = std::make_shared( - device, nrow, build_param_.num_layers, true, build_param_.tau); - - ggnn_->set_base_data(dataset); - ggnn_->set_stream(stream); + base_dataset = dataset; + base_n_rows = nrow; + graph_file = std::nullopt; + load_impl(); ggnn_->build(0); for (int i = 0; i < build_param_.refine_iterations; ++i) { ggnn_->refine(); @@ -207,7 +227,11 @@ void GgnnImpl::build(const T* dataset, template void GgnnImpl::set_search_dataset(const T* dataset, size_t nrow) { - ggnn_->set_base_data(dataset); + if (base_dataset != dataset || base_n_rows != nrow) { + base_dataset = dataset; + base_n_rows = nrow; + load_impl(); + } } template @@ -217,12 +241,8 @@ void GgnnImpl::set_search_param(const AnnSearc } template -void GgnnImpl::search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream) const +void GgnnImpl::search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const { static_assert(sizeof(size_t) == sizeof(int64_t), "sizes of size_t and GGNN's KeyT are different"); if (k != KQuery) { @@ -231,7 +251,7 @@ void GgnnImpl::search(const T* queries, ", but this GGNN instance only supports k = " + std::to_string(KQuery)); } - ggnn_->set_stream(stream); + ggnn_->set_stream(get_sync_stream()); RAFT_CUDA_TRY(cudaMemcpyToSymbol(c_tau_query, &search_param_.tau, sizeof(float))); const int block_dim = search_param_.block_dim; @@ -276,7 +296,7 @@ void GgnnImpl::save(const std::string& file) c { auto& ggnn_host = ggnn_->ggnn_cpu_buffers.at(0); auto& ggnn_device = ggnn_->ggnn_shards.at(0); - ggnn_->set_stream(0); + ggnn_->set_stream(get_sync_stream()); ggnn_host.downloadAsync(ggnn_device); RAFT_CUDA_TRY(cudaStreamSynchronize(ggnn_device.stream)); @@ -286,13 +306,10 @@ void GgnnImpl::save(const std::string& file) c template void GgnnImpl::load(const std::string& file) { - auto& ggnn_host = ggnn_->ggnn_cpu_buffers.at(0); - auto& ggnn_device = ggnn_->ggnn_shards.at(0); - ggnn_->set_stream(0); - - ggnn_host.load(file); - ggnn_host.uploadAsync(ggnn_device); - RAFT_CUDA_TRY(cudaStreamSynchronize(ggnn_device.stream)); + if (!graph_file.has_value() || graph_file.value() != file) { + graph_file = file; + load_impl(); + } } } // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/hnswlib/hnswlib_wrapper.h b/cpp/bench/ann/src/hnswlib/hnswlib_wrapper.h index 08b2f188c5..45f7698a2d 100644 --- a/cpp/bench/ann/src/hnswlib/hnswlib_wrapper.h +++ b/cpp/bench/ann/src/hnswlib/hnswlib_wrapper.h @@ -75,15 +75,11 @@ class HnswLib : public ANN { HnswLib(Metric metric, int dim, const BuildParam& param); - void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) override; + void build(const T* dataset, size_t nrow) override; void set_search_param(const AnnSearchParam& param) override; - void search(const T* query, - int batch_size, - int k, - size_t* indices, - float* distances, - cudaStream_t stream = 0) const override; + void search( + const T* query, int batch_size, int k, size_t* indices, float* distances) const override; void save(const std::string& path_to_index) const override; void load(const std::string& path_to_index) override; @@ -131,7 +127,7 @@ HnswLib::HnswLib(Metric metric, int dim, const BuildParam& param) : ANN(me } template -void HnswLib::build(const T* dataset, size_t nrow, cudaStream_t) +void HnswLib::build(const T* dataset, size_t nrow) { if constexpr (std::is_same_v) { if (metric_ == Metric::kInnerProduct) { @@ -179,7 +175,7 @@ void HnswLib::set_search_param(const AnnSearchParam& param_) template void HnswLib::search( - const T* query, int batch_size, int k, size_t* indices, float* distances, cudaStream_t) const + const T* query, int batch_size, int k, size_t* indices, float* distances) const { auto f = [&](int i) { // hnsw can only handle a single vector at a time. diff --git a/cpp/bench/ann/src/raft/raft_ann_bench_utils.h b/cpp/bench/ann/src/raft/raft_ann_bench_utils.h index 638f498b59..40c1ecfa5e 100644 --- a/cpp/bench/ann/src/raft/raft_ann_bench_utils.h +++ b/cpp/bench/ann/src/raft/raft_ann_bench_utils.h @@ -15,22 +15,21 @@ */ #pragma once +#include "../common/util.hpp" + #include #include #include +#include #include #include +#include #include +#include #include -#include -#include -#include #include -#include -#include -#include #include namespace raft::bench::ann { @@ -47,6 +46,65 @@ inline raft::distance::DistanceType parse_metric_type(raft::bench::ann::Metric m } } +/** Report a more verbose error with a backtrace when OOM occurs on RMM side. */ +inline auto rmm_oom_callback(std::size_t bytes, void*) -> bool +{ + auto cuda_status = cudaGetLastError(); + size_t free = 0; + size_t total = 0; + RAFT_CUDA_TRY_NO_THROW(cudaMemGetInfo(&free, &total)); + RAFT_FAIL( + "Failed to allocate %zu bytes using RMM memory resource. " + "NB: latest cuda status = %s, free memory = %zu, total memory = %zu.", + bytes, + cudaGetErrorName(cuda_status), + free, + total); +} + +/** + * This container keeps the part of raft state that should be shared among multiple copies of raft + * handles (in different CPU threads). + * An example of this is an RMM memory resource: if we had an RMM memory pool per thread, we'd + * quickly run out of memory. + */ +class shared_raft_resources { + public: + using pool_mr_type = rmm::mr::pool_memory_resource; + using mr_type = rmm::mr::failure_callback_resource_adaptor; + + shared_raft_resources() + try : orig_resource_{rmm::mr::get_current_device_resource()}, + pool_resource_(orig_resource_, 1024 * 1024 * 1024ull), + resource_(&pool_resource_, rmm_oom_callback, nullptr) { + rmm::mr::set_current_device_resource(&resource_); + } catch (const std::exception& e) { + auto cuda_status = cudaGetLastError(); + size_t free = 0; + size_t total = 0; + RAFT_CUDA_TRY_NO_THROW(cudaMemGetInfo(&free, &total)); + RAFT_FAIL( + "Failed to initialize shared raft resources (NB: latest cuda status = %s, free memory = %zu, " + "total memory = %zu): %s", + cudaGetErrorName(cuda_status), + free, + total, + e.what()); + } + + shared_raft_resources(shared_raft_resources&&) = delete; + shared_raft_resources& operator=(shared_raft_resources&&) = delete; + shared_raft_resources(const shared_raft_resources& res) = delete; + shared_raft_resources& operator=(const shared_raft_resources& other) = delete; + + ~shared_raft_resources() noexcept { rmm::mr::set_current_device_resource(orig_resource_); } + + private: + rmm::mr::device_memory_resource* orig_resource_; + pool_mr_type pool_resource_; + mr_type resource_; +}; + /** * This struct is used by multiple raft benchmark wrappers. It serves as a thread-safe keeper of * shared and private GPU resources (see below). @@ -58,48 +116,17 @@ inline raft::distance::DistanceType parse_metric_type(raft::bench::ann::Metric m */ class configured_raft_resources { public: - using device_mr_t = rmm::mr::pool_memory_resource; /** * This constructor has the shared state passed unmodified but creates the local state anew. * It's used by the copy constructor. */ - explicit configured_raft_resources(const std::shared_ptr& mr) - : mr_{mr}, - sync_{[]() { - auto* ev = new cudaEvent_t; - RAFT_CUDA_TRY(cudaEventCreate(ev, cudaEventDisableTiming)); - return ev; - }(), - [](cudaEvent_t* ev) { - RAFT_CUDA_TRY_NO_THROW(cudaEventDestroy(*ev)); - delete ev; - }}, - res_{cudaStreamPerThread} + explicit configured_raft_resources(const std::shared_ptr& shared_res) + : shared_res_{shared_res}, res_{rmm::cuda_stream_view(get_stream_from_global_pool())} { } /** Default constructor creates all resources anew. */ - configured_raft_resources() - : configured_raft_resources{ - {[]() { - auto* mr = - new device_mr_t{rmm::mr::get_current_device_resource(), 1024 * 1024 * 1024ull}; - rmm::mr::set_current_device_resource(mr); - return mr; - }(), - [](device_mr_t* mr) { - if (mr == nullptr) { return; } - auto* cur_mr = dynamic_cast(rmm::mr::get_current_device_resource()); - if (cur_mr != nullptr && (*cur_mr) == (*mr)) { - // Normally, we'd always want to set the rmm resource back to the upstream of the pool - // here. However, we expect some implementations may be buggy and mess up the rmm - // resource, especially during development. This extra check here adds a little bit of - // resilience: let the program crash/fail somewhere else rather than in the destructor - // of the shared pointer. - rmm::mr::set_current_device_resource(mr->get_upstream()); - } - delete mr; - }}} + configured_raft_resources() : configured_raft_resources{std::make_shared()} { } @@ -107,37 +134,24 @@ class configured_raft_resources { configured_raft_resources& operator=(configured_raft_resources&&) = default; ~configured_raft_resources() = default; configured_raft_resources(const configured_raft_resources& res) - : configured_raft_resources{res.mr_} + : configured_raft_resources{res.shared_res_} { } configured_raft_resources& operator=(const configured_raft_resources& other) { - this->mr_ = other.mr_; + this->shared_res_ = other.shared_res_; return *this; } operator raft::resources&() noexcept { return res_; } operator const raft::resources&() const noexcept { return res_; } - /** Make the given stream wait on all work submitted to the resource. */ - void stream_wait(cudaStream_t stream) const - { - RAFT_CUDA_TRY(cudaEventRecord(*sync_, resource::get_cuda_stream(res_))); - RAFT_CUDA_TRY(cudaStreamWaitEvent(stream, *sync_)); - } - - /** Get the internal sync event (which otherwise used only in `stream_wait`). */ - cudaEvent_t get_sync_event() const { return *sync_; } + /** Get the main stream */ + [[nodiscard]] auto get_sync_stream() const noexcept { return resource::get_cuda_stream(res_); } private: - /** - * This pool is set as the RMM current device, hence its shared among all users of RMM resources. - * Its lifetime must be longer than that of any other cuda resources. It's not exposed and not - * used by anyone directly. - */ - std::shared_ptr mr_; - /** Each benchmark wrapper must have its own copy of the synchronization event. */ - std::unique_ptr> sync_; + /** The resources shared among multiple raft handles / threads. */ + std::shared_ptr shared_res_; /** * Until we make the use of copies of raft::resources thread-safe, each benchmark wrapper must * have its own copy of it. diff --git a/cpp/bench/ann/src/raft/raft_benchmark.cu b/cpp/bench/ann/src/raft/raft_benchmark.cu index 37f5fb674b..8bb4d9423c 100644 --- a/cpp/bench/ann/src/raft/raft_benchmark.cu +++ b/cpp/bench/ann/src/raft/raft_benchmark.cu @@ -58,10 +58,13 @@ std::unique_ptr> create_algo(const std::string& algo, if constexpr (std::is_same_v) {} #ifdef RAFT_ANN_BENCH_USE_RAFT_IVF_FLAT - if (algo == "raft_ivf_flat") { - typename raft::bench::ann::RaftIvfFlatGpu::BuildParam param; - parse_build_param(conf, param); - ann = std::make_unique>(metric, dim, param); + if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v) { + if (algo == "raft_ivf_flat") { + typename raft::bench::ann::RaftIvfFlatGpu::BuildParam param; + parse_build_param(conf, param); + ann = std::make_unique>(metric, dim, param); + } } #endif #ifdef RAFT_ANN_BENCH_USE_RAFT_IVF_PQ @@ -95,11 +98,14 @@ std::unique_ptr::AnnSearchParam> create_search } #endif #ifdef RAFT_ANN_BENCH_USE_RAFT_IVF_FLAT - if (algo == "raft_ivf_flat") { - auto param = - std::make_unique::SearchParam>(); - parse_search_param(conf, *param); - return param; + if constexpr (std::is_same_v || std::is_same_v || + std::is_same_v) { + if (algo == "raft_ivf_flat") { + auto param = + std::make_unique::SearchParam>(); + parse_search_param(conf, *param); + return param; + } } #endif #ifdef RAFT_ANN_BENCH_USE_RAFT_IVF_PQ @@ -124,6 +130,7 @@ std::unique_ptr::AnnSearchParam> create_search }; // namespace raft::bench::ann REGISTER_ALGO_INSTANCE(float); +REGISTER_ALGO_INSTANCE(half); REGISTER_ALGO_INSTANCE(std::int8_t); REGISTER_ALGO_INSTANCE(std::uint8_t); diff --git a/cpp/bench/ann/src/raft/raft_cagra.cu b/cpp/bench/ann/src/raft/raft_cagra.cu index be18af7f2c..c0c1352a43 100644 --- a/cpp/bench/ann/src/raft/raft_cagra.cu +++ b/cpp/bench/ann/src/raft/raft_cagra.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,5 +18,6 @@ namespace raft::bench::ann { template class RaftCagra; template class RaftCagra; +template class RaftCagra; template class RaftCagra; } // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/raft/raft_cagra_hnswlib_wrapper.h b/cpp/bench/ann/src/raft/raft_cagra_hnswlib_wrapper.h index 3fd0a374b7..2018815c1e 100644 --- a/cpp/bench/ann/src/raft/raft_cagra_hnswlib_wrapper.h +++ b/cpp/bench/ann/src/raft/raft_cagra_hnswlib_wrapper.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ namespace raft::bench::ann { template -class RaftCagraHnswlib : public ANN { +class RaftCagraHnswlib : public ANN, public AnnGPU { public: using typename ANN::AnnSearchParam; using BuildParam = typename RaftCagra::BuildParam; @@ -36,18 +36,19 @@ class RaftCagraHnswlib : public ANN { { } - void build(const T* dataset, size_t nrow, cudaStream_t stream) final; + void build(const T* dataset, size_t nrow) final; void set_search_param(const AnnSearchParam& param) override; // TODO: if the number of results is less than k, the remaining elements of 'neighbors' // will be filled with (size_t)-1 - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const override; + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const override; + + [[nodiscard]] auto get_sync_stream() const noexcept -> cudaStream_t override + { + return cagra_build_.get_sync_stream(); + } // to enable dataset access from GPU memory AlgoProperty get_preference() const override @@ -71,9 +72,9 @@ class RaftCagraHnswlib : public ANN { }; template -void RaftCagraHnswlib::build(const T* dataset, size_t nrow, cudaStream_t stream) +void RaftCagraHnswlib::build(const T* dataset, size_t nrow) { - cagra_build_.build(dataset, nrow, stream); + cagra_build_.build(dataset, nrow); } template @@ -96,14 +97,10 @@ void RaftCagraHnswlib::load(const std::string& file) } template -void RaftCagraHnswlib::search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream) const +void RaftCagraHnswlib::search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const { - hnswlib_search_.search(queries, batch_size, k, neighbors, distances, stream); + hnswlib_search_.search(queries, batch_size, k, neighbors, distances); } } // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/raft/raft_cagra_wrapper.h b/cpp/bench/ann/src/raft/raft_cagra_wrapper.h index fe07d02452..ba4936976e 100755 --- a/cpp/bench/ann/src/raft/raft_cagra_wrapper.h +++ b/cpp/bench/ann/src/raft/raft_cagra_wrapper.h @@ -52,7 +52,7 @@ namespace raft::bench::ann { enum class AllocatorType { HostPinned, HostHugePage, Device }; template -class RaftCagra : public ANN { +class RaftCagra : public ANN, public AnnGPU { public: using typename ANN::AnnSearchParam; @@ -90,7 +90,7 @@ class RaftCagra : public ANN { index_params_.ivf_pq_build_params->metric = parse_metric_type(metric); } - void build(const T* dataset, size_t nrow, cudaStream_t stream) final; + void build(const T* dataset, size_t nrow) final; void set_search_param(const AnnSearchParam& param) override; @@ -98,12 +98,13 @@ class RaftCagra : public ANN { // TODO: if the number of results is less than k, the remaining elements of 'neighbors' // will be filled with (size_t)-1 - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const override; + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const override; + + [[nodiscard]] auto get_sync_stream() const noexcept -> cudaStream_t override + { + return handle_.get_sync_stream(); + } // to enable dataset access from GPU memory AlgoProperty get_preference() const override @@ -145,7 +146,7 @@ class RaftCagra : public ANN { }; template -void RaftCagra::build(const T* dataset, size_t nrow, cudaStream_t stream) +void RaftCagra::build(const T* dataset, size_t nrow) { auto dataset_view = raft::make_host_matrix_view(dataset, IdxT(nrow), dimension_); @@ -161,8 +162,6 @@ void RaftCagra::build(const T* dataset, size_t nrow, cudaStream_t strea index_params_.ivf_pq_build_params, index_params_.ivf_pq_search_params, false))); - - handle_.stream_wait(stream); // RAFT stream -> bench stream } inline std::string allocator_to_string(AllocatorType mem_type) @@ -185,7 +184,7 @@ void RaftCagra::set_search_param(const AnnSearchParam& param) if (search_param.graph_mem != graph_mem_) { // Move graph to correct memory space graph_mem_ = search_param.graph_mem; - RAFT_LOG_INFO("moving graph to new memory space: %s", allocator_to_string(graph_mem_).c_str()); + RAFT_LOG_DEBUG("moving graph to new memory space: %s", allocator_to_string(graph_mem_).c_str()); // We create a new graph and copy to it from existing graph auto mr = get_mr(graph_mem_); auto new_graph = make_device_mdarray( @@ -209,19 +208,16 @@ void RaftCagra::set_search_param(const AnnSearchParam& param) index_->update_dataset(handle_, make_const_mdspan(dataset_->view())); // Allocate space using the correct memory resource. - RAFT_LOG_INFO("moving dataset to new memory space: %s", - allocator_to_string(dataset_mem_).c_str()); + RAFT_LOG_DEBUG("moving dataset to new memory space: %s", + allocator_to_string(dataset_mem_).c_str()); auto mr = get_mr(dataset_mem_); raft::neighbors::cagra::detail::copy_with_padding(handle_, *dataset_, *input_dataset_v_, mr); - index_->update_dataset(handle_, make_const_mdspan(dataset_->view())); + auto dataset_view = raft::make_device_strided_matrix_view( + dataset_->data_handle(), dataset_->extent(0), this->dim_, dataset_->extent(1)); + index_->update_dataset(handle_, dataset_view); - // Ideally, instead of dataset_.view(), we should pass a strided matrix view to update. - // See Issue https://github.com/rapidsai/raft/issues/1972 for details. - // auto dataset_view = make_device_strided_matrix_view( - // dataset_.data_handle(), dataset_.extent(0), this->dim_, dataset_.extent(1)); - // index_->update_dataset(handle_, dataset_view); need_dataset_update_ = false; } } @@ -264,16 +260,12 @@ std::unique_ptr> RaftCagra::copy() } template -void RaftCagra::search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream) const +void RaftCagra::search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const { IdxT* neighbors_IdxT; rmm::device_uvector neighbors_storage(0, resource::get_cuda_stream(handle_)); - if constexpr (std::is_same::value) { + if constexpr (std::is_same_v) { neighbors_IdxT = neighbors; } else { neighbors_storage.resize(batch_size * k, resource::get_cuda_stream(handle_)); @@ -288,14 +280,12 @@ void RaftCagra::search(const T* queries, raft::neighbors::cagra::search( handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); - if (!std::is_same::value) { + if constexpr (!std::is_same_v) { raft::linalg::unaryOp(neighbors, neighbors_IdxT, batch_size * k, raft::cast_op(), raft::resource::get_cuda_stream(handle_)); } - - handle_.stream_wait(stream); // RAFT stream -> bench stream } } // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/raft/raft_ivf_flat_wrapper.h b/cpp/bench/ann/src/raft/raft_ivf_flat_wrapper.h index 06ee355ae7..7f2996d77a 100644 --- a/cpp/bench/ann/src/raft/raft_ivf_flat_wrapper.h +++ b/cpp/bench/ann/src/raft/raft_ivf_flat_wrapper.h @@ -39,7 +39,7 @@ namespace raft::bench::ann { template -class RaftIvfFlatGpu : public ANN { +class RaftIvfFlatGpu : public ANN, public AnnGPU { public: using typename ANN::AnnSearchParam; @@ -57,18 +57,19 @@ class RaftIvfFlatGpu : public ANN { RAFT_CUDA_TRY(cudaGetDevice(&device_)); } - void build(const T* dataset, size_t nrow, cudaStream_t stream) final; + void build(const T* dataset, size_t nrow) final; void set_search_param(const AnnSearchParam& param) override; // TODO: if the number of results is less than k, the remaining elements of 'neighbors' // will be filled with (size_t)-1 - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const override; + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const override; + + [[nodiscard]] auto get_sync_stream() const noexcept -> cudaStream_t override + { + return handle_.get_sync_stream(); + } // to enable dataset access from GPU memory AlgoProperty get_preference() const override @@ -93,11 +94,10 @@ class RaftIvfFlatGpu : public ANN { }; template -void RaftIvfFlatGpu::build(const T* dataset, size_t nrow, cudaStream_t stream) +void RaftIvfFlatGpu::build(const T* dataset, size_t nrow) { index_ = std::make_shared>(std::move( raft::neighbors::ivf_flat::build(handle_, index_params_, dataset, IdxT(nrow), dimension_))); - handle_.stream_wait(stream); // RAFT stream -> bench stream } template @@ -130,16 +130,11 @@ std::unique_ptr> RaftIvfFlatGpu::copy() } template -void RaftIvfFlatGpu::search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream) const +void RaftIvfFlatGpu::search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const { static_assert(sizeof(size_t) == sizeof(IdxT), "IdxT is incompatible with size_t"); raft::neighbors::ivf_flat::search( handle_, search_params_, *index_, queries, batch_size, k, (IdxT*)neighbors, distances); - handle_.stream_wait(stream); // RAFT stream -> bench stream } } // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/raft/raft_ivf_pq.cu b/cpp/bench/ann/src/raft/raft_ivf_pq.cu index 2efe14631b..d4f68c1c7d 100644 --- a/cpp/bench/ann/src/raft/raft_ivf_pq.cu +++ b/cpp/bench/ann/src/raft/raft_ivf_pq.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ namespace raft::bench::ann { template class RaftIvfPQ; +template class RaftIvfPQ; template class RaftIvfPQ; template class RaftIvfPQ; } // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/raft/raft_ivf_pq_wrapper.h b/cpp/bench/ann/src/raft/raft_ivf_pq_wrapper.h index 17e324f918..5d8b682264 100644 --- a/cpp/bench/ann/src/raft/raft_ivf_pq_wrapper.h +++ b/cpp/bench/ann/src/raft/raft_ivf_pq_wrapper.h @@ -27,10 +27,10 @@ #include #include #include +#include #include +#include #include -#include -#include #include #include @@ -40,7 +40,7 @@ namespace raft::bench::ann { template -class RaftIvfPQ : public ANN { +class RaftIvfPQ : public ANN, public AnnGPU { public: using typename ANN::AnnSearchParam; using ANN::dim_; @@ -59,19 +59,20 @@ class RaftIvfPQ : public ANN { index_params_.metric = parse_metric_type(metric); } - void build(const T* dataset, size_t nrow, cudaStream_t stream) final; + void build(const T* dataset, size_t nrow) final; void set_search_param(const AnnSearchParam& param) override; void set_search_dataset(const T* dataset, size_t nrow) override; // TODO: if the number of results is less than k, the remaining elements of 'neighbors' // will be filled with (size_t)-1 - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const override; + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const override; + + [[nodiscard]] auto get_sync_stream() const noexcept -> cudaStream_t override + { + return handle_.get_sync_stream(); + } // to enable dataset access from GPU memory AlgoProperty get_preference() const override @@ -99,26 +100,23 @@ class RaftIvfPQ : public ANN { template void RaftIvfPQ::save(const std::string& file) const { - raft::runtime::neighbors::ivf_pq::serialize(handle_, file, *index_); + raft::neighbors::ivf_pq::serialize(handle_, file, *index_); } template void RaftIvfPQ::load(const std::string& file) { - std::make_shared>(handle_, index_params_, dimension_) - .swap(index_); - raft::runtime::neighbors::ivf_pq::deserialize(handle_, file, index_.get()); - return; + index_ = std::make_shared>( + std::move(raft::neighbors::ivf_pq::deserialize(handle_, file))); } template -void RaftIvfPQ::build(const T* dataset, size_t nrow, cudaStream_t stream) +void RaftIvfPQ::build(const T* dataset, size_t nrow) { auto dataset_v = raft::make_device_matrix_view(dataset, IdxT(nrow), dim_); std::make_shared>( - std::move(raft::runtime::neighbors::ivf_pq::build(handle_, index_params_, dataset_v))) + std::move(raft::neighbors::ivf_pq::build(handle_, index_params_, dataset_v))) .swap(index_); - handle_.stream_wait(stream); // RAFT stream -> bench stream } template @@ -143,21 +141,17 @@ void RaftIvfPQ::set_search_dataset(const T* dataset, size_t nrow) } template -void RaftIvfPQ::search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream) const +void RaftIvfPQ::search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const { if (refine_ratio_ > 1.0f) { uint32_t k0 = static_cast(refine_ratio_ * k); auto queries_v = - raft::make_device_matrix_view(queries, batch_size, index_->dim()); - auto distances_tmp = raft::make_device_matrix(handle_, batch_size, k0); - auto candidates = raft::make_device_matrix(handle_, batch_size, k0); + raft::make_device_matrix_view(queries, batch_size, index_->dim()); + auto distances_tmp = raft::make_device_matrix(handle_, batch_size, k0); + auto candidates = raft::make_device_matrix(handle_, batch_size, k0); - raft::runtime::neighbors::ivf_pq::search( + raft::neighbors::ivf_pq::search( handle_, search_params_, *index_, queries_v, candidates.view(), distances_tmp.view()); if (raft::get_device_for_address(dataset_.data_handle()) >= 0) { @@ -166,53 +160,48 @@ void RaftIvfPQ::search(const T* queries, auto neighbors_v = raft::make_device_matrix_view((IdxT*)neighbors, batch_size, k); auto distances_v = raft::make_device_matrix_view(distances, batch_size, k); - raft::runtime::neighbors::refine(handle_, - dataset_, - queries_v, - candidates.view(), - neighbors_v, - distances_v, - index_->metric()); - handle_.stream_wait(stream); // RAFT stream -> bench stream + raft::neighbors::refine(handle_, + dataset_, + queries_v, + candidates.view(), + neighbors_v, + distances_v, + index_->metric()); } else { auto queries_host = raft::make_host_matrix(batch_size, index_->dim()); auto candidates_host = raft::make_host_matrix(batch_size, k0); auto neighbors_host = raft::make_host_matrix(batch_size, k); auto distances_host = raft::make_host_matrix(batch_size, k); + auto stream = resource::get_cuda_stream(handle_); raft::copy(queries_host.data_handle(), queries, queries_host.size(), stream); - raft::copy(candidates_host.data_handle(), - candidates.data_handle(), - candidates_host.size(), - resource::get_cuda_stream(handle_)); + raft::copy( + candidates_host.data_handle(), candidates.data_handle(), candidates_host.size(), stream); auto dataset_v = raft::make_host_matrix_view( dataset_.data_handle(), dataset_.extent(0), dataset_.extent(1)); - // wait for the queries to copy to host in 'stream` and for IVF-PQ::search to finish - RAFT_CUDA_TRY(cudaEventRecord(handle_.get_sync_event(), resource::get_cuda_stream(handle_))); - RAFT_CUDA_TRY(cudaEventRecord(handle_.get_sync_event(), stream)); - RAFT_CUDA_TRY(cudaEventSynchronize(handle_.get_sync_event())); - raft::runtime::neighbors::refine(handle_, - dataset_v, - queries_host.view(), - candidates_host.view(), - neighbors_host.view(), - distances_host.view(), - index_->metric()); + raft::resource::sync_stream(handle_); // wait for the queries and candidates + raft::neighbors::refine(handle_, + dataset_v, + queries_host.view(), + candidates_host.view(), + neighbors_host.view(), + distances_host.view(), + index_->metric()); raft::copy(neighbors, (size_t*)neighbors_host.data_handle(), neighbors_host.size(), stream); raft::copy(distances, distances_host.data_handle(), distances_host.size(), stream); } } else { auto queries_v = - raft::make_device_matrix_view(queries, batch_size, index_->dim()); - auto neighbors_v = raft::make_device_matrix_view((IdxT*)neighbors, batch_size, k); - auto distances_v = raft::make_device_matrix_view(distances, batch_size, k); + raft::make_device_matrix_view(queries, batch_size, index_->dim()); + auto neighbors_v = + raft::make_device_matrix_view((IdxT*)neighbors, batch_size, k); + auto distances_v = raft::make_device_matrix_view(distances, batch_size, k); - raft::runtime::neighbors::ivf_pq::search( + raft::neighbors::ivf_pq::search( handle_, search_params_, *index_, queries_v, neighbors_v, distances_v); - handle_.stream_wait(stream); // RAFT stream -> bench stream } } } // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/raft/raft_wrapper.h b/cpp/bench/ann/src/raft/raft_wrapper.h index eae615cba1..6fcb869d38 100644 --- a/cpp/bench/ann/src/raft/raft_wrapper.h +++ b/cpp/bench/ann/src/raft/raft_wrapper.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,24 +45,20 @@ namespace raft::bench::ann { // brute force KNN - RAFT template -class RaftGpu : public ANN { +class RaftGpu : public ANN, public AnnGPU { public: using typename ANN::AnnSearchParam; RaftGpu(Metric metric, int dim); - void build(const T*, size_t, cudaStream_t) final; + void build(const T*, size_t) final; void set_search_param(const AnnSearchParam& param) override; // TODO: if the number of results is less than k, the remaining elements of 'neighbors' // will be filled with (size_t)-1 - void search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream = 0) const final; + void search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const final; // to enable dataset access from GPU memory AlgoProperty get_preference() const override @@ -72,6 +68,10 @@ class RaftGpu : public ANN { property.query_memory_type = MemoryType::Device; return property; } + [[nodiscard]] auto get_sync_stream() const noexcept -> cudaStream_t override + { + return handle_.get_sync_stream(); + } void set_search_dataset(const T* dataset, size_t nrow) override; void save(const std::string& file) const override; void load(const std::string&) override; @@ -97,13 +97,11 @@ RaftGpu::RaftGpu(Metric metric, int dim) } template -void RaftGpu::build(const T* dataset, size_t nrow, cudaStream_t stream) +void RaftGpu::build(const T* dataset, size_t nrow) { auto dataset_view = raft::make_host_matrix_view(dataset, nrow, this->dim_); index_ = std::make_shared>( std::move(raft::neighbors::brute_force::build(handle_, dataset_view))); - - handle_.stream_wait(stream); } template @@ -133,12 +131,8 @@ void RaftGpu::load(const std::string& file) } template -void RaftGpu::search(const T* queries, - int batch_size, - int k, - size_t* neighbors, - float* distances, - cudaStream_t stream) const +void RaftGpu::search( + const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const { auto queries_view = raft::make_device_matrix_view(queries, batch_size, this->dim_); @@ -148,8 +142,6 @@ void RaftGpu::search(const T* queries, raft::neighbors::brute_force::search( handle_, *index_, queries_view, neighbors_view, distances_view); - - handle_.stream_wait(stream); } template diff --git a/cpp/include/raft/neighbors/cagra_types.hpp b/cpp/include/raft/neighbors/cagra_types.hpp index 00c363b377..cc76d70fbf 100644 --- a/cpp/include/raft/neighbors/cagra_types.hpp +++ b/cpp/include/raft/neighbors/cagra_types.hpp @@ -280,6 +280,14 @@ struct index : ann::index { } } + /** Set the dataset reference explicitly to a device matrix view with padding. */ + void update_dataset(raft::resources const&, + raft::device_matrix_view dataset) + { + RAFT_EXPECTS(dataset.stride(0) * sizeof(T) % 16 == 0, "Incorrect data padding."); + dataset_view_ = dataset; + } + /** * Replace the dataset with a new dataset. *