Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ANN bench latency #1940

Merged
merged 23 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
98152ff
Define latency, add sync, add FixLatencyWorkload
tfeher Oct 31, 2023
9604b62
Remove manual timing from within the loop
tfeher Oct 31, 2023
884b0bc
Update timer description
tfeher Oct 31, 2023
fb81d3f
Add configuration for ANN benchmark tests
tfeher Oct 31, 2023
b4f37f1
update doc
tfeher Nov 1, 2023
5186c12
Add --threads benchmark arg
tfeher Nov 2, 2023
dc8fc0f
Add docstring for --threads arg
tfeher Nov 2, 2023
274283c
Merge remote-tracking branch 'origin/branch-23.12' into fix_ann_bench…
tfeher Nov 2, 2023
fefc16c
remove fixed_latency_workload that was used for debugging
tfeher Nov 2, 2023
52670f7
restore raft_benchmark.cu
tfeher Nov 2, 2023
c310095
Extend comments on synchronization
tfeher Nov 2, 2023
777ded7
Fix query_set initialization
tfeher Nov 2, 2023
71659a2
Adding search-threads option
cjnolet Nov 2, 2023
f87430b
Correctly passing search-thread
cjnolet Nov 2, 2023
7645a3d
Using for search
cjnolet Nov 2, 2023
2a8822c
Update __main__.py
cjnolet Nov 3, 2023
4a7aec5
Including cuda_stub
cjnolet Nov 3, 2023
be90181
Merge branch 'fix_ann_bench_latency' of github.com:tfeher/raft into f…
cjnolet Nov 3, 2023
cca71e8
Fixing style
cjnolet Nov 3, 2023
a57700b
Merge branch 'branch-23.12' into fix_ann_bench_latency
cjnolet Nov 3, 2023
d903245
Remove cudaDeviceSynchronize() to fix CPU_ONLY build
tfeher Nov 3, 2023
b19fb44
Removing cudart calls from benchmark.hpp
cjnolet Nov 3, 2023
ab049c3
Merge branch 'fix_ann_bench_latency' of github.com:tfeher/raft into f…
cjnolet Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 71 additions & 36 deletions cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
#include <algorithm>
#include <chrono>
#include <cmath>
#include <condition_variable>
#include <cstdint>
#include <fstream>
#include <limits>
#include <memory>
#include <mutex>
#include <numeric>
#include <string>
#include <unistd.h>
#include <vector>

namespace raft::bench::ann {

std::mutex init_mutex;
std::condition_variable cond_var;

static inline std::unique_ptr<AnnBase> current_algo{nullptr};
static inline std::shared_ptr<AlgoProperty> current_algo_props{nullptr};

Expand Down Expand Up @@ -172,8 +176,6 @@ void bench_search(::benchmark::State& state,
std::ptrdiff_t batch_offset = 0;
std::size_t queries_processed = 0;

double total_time = 0;

const auto& sp_json = index.search_params[search_param_ix];

if (state.thread_index() == 0) { dump_parameters(state, sp_json); }
Expand All @@ -185,6 +187,8 @@ void bench_search(::benchmark::State& state,
// Round down the query data to a multiple of the batch size to loop over full batches of data
const std::size_t query_set_size = (dataset->query_set_size() / n_queries) * n_queries;

const T* query_set = nullptr;

if (!file_exists(index.file)) {
state.SkipWithError("Index file is missing. Run the benchmark in the build mode first.");
return;
Expand All @@ -194,6 +198,7 @@ void bench_search(::benchmark::State& state,
* Make sure the first thread loads the algo and dataset
*/
if (state.thread_index() == 0) {
std::lock_guard lk(init_mutex);
// algo is static to cache it between close search runs to save time on index loading
static std::string index_file = "";
if (index.file != index_file) {
Expand Down Expand Up @@ -233,18 +238,26 @@ void bench_search(::benchmark::State& state,
return;
}
}

try {
algo->set_search_param(*search_param);

} catch (const std::exception& ex) {
state.SkipWithError("An error occurred setting search parameters: " + std::string(ex.what()));
return;
}
}

query_set = dataset->query_set(current_algo_props->query_memory_type);
cond_var.notify_all();
} else {
// All other threads will wait for the first thread to initialize the algo.
std::unique_lock lk(init_mutex);
cond_var.wait(lk, [] { return current_algo_props.get() != nullptr; });
// gbench ensures that all threads are synchronized at the start of the benchmark loop.
// We are accessing shared variables (like current_algo, current_algo_probs) before the
// benchmark loop, therefore the synchronization here is necessary.
}
const auto algo_property = *current_algo_props;
const T* query_set = dataset->query_set(algo_property.query_memory_type);
query_set = dataset->query_set(algo_property.query_memory_type);

/**
* Each thread will manage its own outputs
Expand All @@ -265,7 +278,6 @@ void bench_search(::benchmark::State& state,
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();

auto start = std::chrono::high_resolution_clock::now();
// run the search
try {
algo->search(query_set + batch_offset * dataset->dim(),
Expand All @@ -278,24 +290,21 @@ void bench_search(::benchmark::State& state,
state.SkipWithError(std::string(e.what()));
}

auto end = std::chrono::high_resolution_clock::now();

auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
// advance to the next batch
batch_offset = (batch_offset + n_queries) % query_set_size;
queries_processed += n_queries;
state.SetIterationTime(elapsed_seconds.count());
tfeher marked this conversation as resolved.
Show resolved Hide resolved
total_time += elapsed_seconds.count();
}
}
auto end = std::chrono::high_resolution_clock::now();
if (state.thread_index() == 0) {
auto duration = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
state.counters.insert({{"end_to_end", duration}});
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
if (state.thread_index() == 0) { state.counters.insert({{"end_to_end", duration}}); }
state.counters.insert(
{"Latency", {duration / double(state.iterations()), benchmark::Counter::kAvgThreads}});
tfeher marked this conversation as resolved.
Show resolved Hide resolved

state.SetItemsProcessed(queries_processed);
if (cudart.found()) {
state.counters.insert({{"GPU", gpu_timer.total_time() / double(state.iterations())}});
double gpu_time_per_iteration = gpu_timer.total_time() / (double)state.iterations();
state.counters.insert({"GPU", {gpu_time_per_iteration, benchmark::Counter::kAvgThreads}});
}

// This will be the total number of queries across all threads
Expand Down Expand Up @@ -341,6 +350,7 @@ inline void printf_usage()
" [--index_prefix=<prefix>]\n"
" [--override_kv=<key:value1:value2:...:valueN>]\n"
" [--mode=<latency|throughput>\n"
" [--threads=min[:max]]\n"
" <conf>.json\n"
"\n"
"Note the non-standard benchmark parameters:\n"
Expand All @@ -359,8 +369,12 @@ inline void printf_usage()
" you can use this parameter multiple times to get the Cartesian product of benchmark"
" configs.\n"
" --mode=<latency|throughput>"
" run the benchmarks in latency (accumulate times spent in each batch) or "
" throughput (pipeline batches and measure end-to-end) mode\n");
" run the benchmarks in latency (accumulate times spent in each batch) or "
" throughput (pipeline batches and measure end-to-end) mode\n"
" --threads=min[:max] specify the number threads to use for throughput benchmark."
" Power of 2 values between 'min' and 'max' will be used. If only 'min' is specified,"
" then a single test is run with 'min' threads. By default min=1, max=<num hyper"
" threads>.\n");
}

template <typename T>
Expand All @@ -385,29 +399,23 @@ void register_build(std::shared_ptr<const Dataset<T>> dataset,
template <typename T>
void register_search(std::shared_ptr<const Dataset<T>> dataset,
std::vector<Configuration::Index> indices,
Objective metric_objective)
Objective metric_objective,
const std::vector<int>& threads)
{
for (auto index : indices) {
for (std::size_t i = 0; i < index.search_params.size(); i++) {
auto suf = static_cast<std::string>(index.search_params[i]["override_suffix"]);
index.search_params[i].erase("override_suffix");

int max_threads =
metric_objective == Objective::THROUGHPUT ? std::thread::hardware_concurrency() : 1;

auto* b = ::benchmark::RegisterBenchmark(
index.name + suf, bench_search<T>, index, i, dataset, metric_objective)
->Unit(benchmark::kMillisecond)
->ThreadRange(1, max_threads)

->ThreadRange(threads[0], threads[1])
/**
* The following are important for getting accuracy QPS measurements on both CPU
* and GPU These make sure that
* - `end_to_end` ~ (`Time` * `Iterations`)
* - `items_per_second` ~ (`total_queries` / `end_to_end`)
* - `Time` = `end_to_end` / `Iterations`
*
* - Latency = `Time`
* - Throughput = `items_per_second`
*/
->MeasureProcessCPUTime()
Expand All @@ -424,7 +432,8 @@ void dispatch_benchmark(const Configuration& conf,
std::string data_prefix,
std::string index_prefix,
kv_series override_kv,
Objective metric_objective)
Objective metric_objective,
const std::vector<int>& threads)
{
if (cudart.found()) {
for (auto [key, value] : cuda_info()) {
Expand Down Expand Up @@ -493,7 +502,7 @@ void dispatch_benchmark(const Configuration& conf,
index.search_params = apply_overrides(index.search_params, override_kv);
index.file = combine_path(index_prefix, index.file);
}
register_search<T>(dataset, indices, metric_objective);
register_search<T>(dataset, indices, metric_objective, threads);
}
}

Expand Down Expand Up @@ -525,6 +534,8 @@ inline auto run_main(int argc, char** argv) -> int
std::string index_prefix = "index";
std::string new_override_kv = "";
std::string mode = "latency";
std::string threads_arg_txt = "";
std::vector<int> threads = {1, -1}; // min_thread, max_thread
kv_series override_kv{};

char arg0_default[] = "benchmark"; // NOLINT
Expand All @@ -548,7 +559,18 @@ inline auto run_main(int argc, char** argv) -> int
parse_string_flag(argv[i], "--data_prefix", data_prefix) ||
parse_string_flag(argv[i], "--index_prefix", index_prefix) ||
parse_string_flag(argv[i], "--mode", mode) ||
parse_string_flag(argv[i], "--override_kv", new_override_kv)) {
parse_string_flag(argv[i], "--override_kv", new_override_kv) ||
parse_string_flag(argv[i], "--threads", threads_arg_txt)) {
if (!threads_arg_txt.empty()) {
auto threads_arg = split(threads_arg_txt, ':');
threads[0] = std::stoi(threads_arg[0]);
if (threads_arg.size() > 1) {
threads[1] = std::stoi(threads_arg[1]);
} else {
threads[1] = threads[0];
}
threads_arg_txt = "";
}
if (!new_override_kv.empty()) {
auto kvv = split(new_override_kv, ':');
auto key = kvv[0];
Expand All @@ -570,6 +592,17 @@ inline auto run_main(int argc, char** argv) -> int
Objective metric_objective = Objective::LATENCY;
if (mode == "throughput") { metric_objective = Objective::THROUGHPUT; }

int max_threads =
(metric_objective == Objective::THROUGHPUT) ? std::thread::hardware_concurrency() : 1;
if (threads[1] == -1) threads[1] = max_threads;

if (metric_objective == Objective::LATENCY) {
if (threads[0] != 1 || threads[1] != 1) {
log_warn("Latency mode enabled. Overriding threads arg, running with single thread.");
threads = {1, 1};
}
}

if (build_mode == search_mode) {
log_error("One and only one of --build and --search should be specified");
printf_usage();
Expand All @@ -596,7 +629,8 @@ inline auto run_main(int argc, char** argv) -> int
data_prefix,
index_prefix,
override_kv,
metric_objective);
metric_objective,
threads);
} else if (dtype == "uint8") {
dispatch_benchmark<std::uint8_t>(conf,
force_overwrite,
Expand All @@ -605,7 +639,8 @@ inline auto run_main(int argc, char** argv) -> int
data_prefix,
index_prefix,
override_kv,
metric_objective);
metric_objective,
threads);
} else if (dtype == "int8") {
dispatch_benchmark<std::int8_t>(conf,
force_overwrite,
Expand All @@ -614,7 +649,8 @@ inline auto run_main(int argc, char** argv) -> int
data_prefix,
index_prefix,
override_kv,
metric_objective);
metric_objective,
threads);
} else {
log_error("datatype '%s' is not supported", dtype.c_str());
return -1;
Expand All @@ -629,5 +665,4 @@ inline auto run_main(int argc, char** argv) -> int
current_algo.reset();
return 0;
}

}; // namespace raft::bench::ann
3 changes: 3 additions & 0 deletions cpp/bench/ann/src/common/cuda_stub.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ namespace stub {
{
return cudaSuccess;
}
[[gnu::weak, gnu::noinline]] cudaError_t cudaDeviceSynchronize() { return cudaSuccess; }

[[gnu::weak, gnu::noinline]] cudaError_t cudaStreamSynchronize(cudaStream_t pStream)
{
return cudaSuccess;
Expand Down Expand Up @@ -214,6 +216,7 @@ RAFT_DECLARE_CUDART(cudaFree);
RAFT_DECLARE_CUDART(cudaStreamCreate);
RAFT_DECLARE_CUDART(cudaStreamCreateWithFlags);
RAFT_DECLARE_CUDART(cudaStreamDestroy);
RAFT_DECLARE_CUDART(cudaDeviceSynchronize);
RAFT_DECLARE_CUDART(cudaStreamSynchronize);
RAFT_DECLARE_CUDART(cudaEventCreate);
RAFT_DECLARE_CUDART(cudaEventRecord);
Expand Down
33 changes: 19 additions & 14 deletions docs/source/raft_ann_benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,22 +397,27 @@ The benchmarks capture several different measurements. The table below describes
| index_size | Number of vectors used to train index |


The table below describes each of the measurements for the index search benchmarks:

| Name | Description |
|------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Benchmark | A name that uniquely identifies the benchmark instance |
| Time | The average runtime for each batch. This is approximately `end_to_end` / `Iterations` |
| CPU | The average `wall-time`. In `throughput` mode, this is the average `wall-time` spent in each thread. |
| Iterations | Total number of batches. This is going to be `total_queres` / `n_queries` |
| Recall | Proportion of correct neighbors to ground truth neighbors. Note this column is only present if groundtruth file is specified in dataset configuration |
| items_per_second | Total throughput. This is approximately `total_queries` / `end_to_end`. |
| k | Number of neighbors being queried in each iteration |
The table below describes each of the measurements for the index search benchmarks. The most important measurements `Latency`, `items_per_second`, `end_to_end`.

| Name | Description |
|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Benchmark | A name that uniquely identifies the benchmark instance |
| Time | The wall-clock time of a single iteration (batch) divided by the number of threads. |
| CPU | The average CPU time (user + sys time). This does not include idle time (which can also happen while waiting for GPU sync). |
| Iterations | Total number of batches. This is going to be `total_queries` / `n_queries`. |
| GPU | GPU latency of a single batch (seconds). In throughput mode this is averaged over multiple threads. |
| Latency | Latency of a single batch (seconds), calculated from wall-clock time. In throughput mode this is averaged over multiple threads. |
| Recall | Proportion of correct neighbors to ground truth neighbors. Note this column is only present if groundtruth file is specified in dataset configuration.|
| items_per_second | Total throughput, a.k.a Queries per second (QPS). This is approximately `total_queries` / `end_to_end`. |
| k | Number of neighbors being queried in each iteration |
| end_to_end | Total time taken to run all batches for all iterations |
| n_queries | Total number of query vectors in each batch |
| total_queries | Total number of vectors queries across all iterations |
| n_queries | Total number of query vectors in each batch |
| total_queries | Total number of vectors queries across all iterations ( = `iterations` * `n_queries`) |

Note that the actual table displayed on the screen may differ slightly as the hyper-parameters will also be displayed for each different combination being benchmarked.
Note the following:
- A slightly different method is used to measure `Time` and `end_to_end`. That is why `end_to_end` = `Time` * `Iterations` holds only approximately.
- The actual table displayed on the screen may differ slightly as the hyper-parameters will also be displayed for each different combination being benchmarked.
- Recall calculation: the number of queries processed per test depends on the number of iterations. Because of this, recall can show slight fluctuations if less neighbors are processed then it is available for the benchmark.

## Creating and customizing dataset configurations

Expand Down
20 changes: 19 additions & 1 deletion python/raft-ann-bench/src/raft-ann-bench/run/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def run_build_and_search(
search,
k,
batch_size,
search_threads,
mode="throughput",
):
for executable, ann_executable_path, algo in executables_to_run.keys():
Expand Down Expand Up @@ -128,14 +129,18 @@ def run_build_and_search(
"--benchmark_counters_tabular=true",
"--override_kv=k:%s" % k,
"--override_kv=n_queries:%s" % batch_size,
"--benchmark_min_warmup_time=0.01",
"--benchmark_min_warmup_time=1",
"--benchmark_out_format=json",
"--mode=%s" % mode,
"--benchmark_out="
+ f"{os.path.join(search_folder, f'{algo}.json')}",
]
if force:
cmd = cmd + ["--overwrite"]

if search_threads:
cmd = cmd + ["--threads=%s" % search_threads]

cmd = cmd + [temp_conf_filepath]
subprocess.run(cmd, check=True)

Expand Down Expand Up @@ -243,6 +248,18 @@ def main():
default="latency",
)

parser.add_argument(
"-t",
"--search-threads",
help="specify the number threads to use for throughput benchmark."
" Single value or a pair of min and max separated by ':'. "
"Example --threads=1:4. Power of 2 values between 'min' "
"and 'max' will be used. If only 'min' is specified, then a "
"single test is run with 'min' threads. By default min=1, "
"max=<num hyper threads>.",
default=None,
)

args = parser.parse_args()

# If both build and search are not provided,
Expand Down Expand Up @@ -444,6 +461,7 @@ def add_algo_group(group_list):
search,
k,
batch_size,
args.search_threads,
mode,
)

Expand Down