Skip to content

Commit

Permalink
Bench test spsc (#21)
Browse files Browse the repository at this point in the history
Cleanup and SPSC performance bench testing similar to some external bench testing I’ve seen

added benchmark that is more sane for SPSC using two queues, lock-free and lock-based.
restructured unit tests
moved away (kept commenting away for now) most performance-based unit tests.
  • Loading branch information
KjellKod authored Dec 16, 2023
1 parent afbfb94 commit 7a22fcd
Show file tree
Hide file tree
Showing 27 changed files with 1,761 additions and 694 deletions.
14 changes: 14 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ file(GLOB HEADER_FILES ${PROJECT_SRC}/q/*.hpp)

message("source files: ${SRC_FILES}")

if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release CACHE STRING "Build type (default: Release)" FORCE)
endif()

# Set compiler flags for high performance
if(CMAKE_COMPILER_IS_GNUCXX)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
endif()


set(Q_LIBRARY qlib)
add_library(${Q_LIBRARY} SHARED ${SRC_FILES})
set(${Q_LIBRARY}_VERSION_STRING ${VERSION})
Expand All @@ -75,3 +85,7 @@ INCLUDE (${Q_SOURCE_DIR}/test/test.cmake)
# EXAMPLES
# ============================================================================
INCLUDE (${Q_SOURCE_DIR}/examples/examples.cmake)
# ==========================================================================
# Benchmark
# ============================================================================
INCLUDE (${Q_SOURCE_DIR}/benchmark/benchmark.cmake)
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ _The SPSC is a powerful building block from which you can create more lock-free

Please see [spsc documentation](spsc.md) for details.

## Benchmark testing
Please see [benchmark documentation](benchmark.md) for details.


# NOT YET DOCUMENTED API
Expand Down
32 changes: 32 additions & 0 deletions benchmark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Scalability Benchmark

## Throughput messages / second
N producer threads push a 4-byte integer onto a queue. M consumer threads op the integers from the queue. All producers posts 1,000,000 messages. The total time to send and receive all messages is measured. The test is repeated 20 times.

Maximum throughput, minimum throughput and mean throughput is shown in the graph.

## Example benchmark tests
2018 Macbook Pro 15" 2.6 GHz Intel Core i7, 16 GB 2400 MHz DDR4
**SPSC** testing with spsc lock-free circular queue, and with mutex protected queue.




Average time per ops (push/pop) is shown below.

# Thoughts on performance measurements.
As long as you have a good understanding of a queue's performancce metrics it's near useless to rely on someone elses performances. Optimizing queue size, avoiding talking about thread_affinity manipulation (or lack thereof), avoiding worst case scenarios etc, etc are all blatantly common in the queue measurements that I've seen.

If you are interested in verifying Q vs something else, then think about what scenarios the queue should perform well at. Do your own performance testing with the queue alternatives on the platform!!! were you intend to use it.

Maybe you have to put a wrapper of sorts (like [q_api](src/q/q_api.hpp)) so you can exchange the queues but it's well worth the effort. Most likely your system will be doing a lot of other tasks, and hyper CPU intensive [spin locks](https://probablydance.com/2019/12/30/measuring-mutexes-spinlocks-and-how-bad-the-linux-scheduler-really-is/) and platform specific CPU instructions might very well be impacting negatively on other performance sensitive tasks of your software.

_you should have **really** good motivation in my opinion before you let a horrendous spin lock into your system_

# Thoughts on correctness
There are some really amazing C++ efficiency queues out there. Most of which are crazy hard to reason about if you read the code and for that reason almost impossible to know if they are actually correct.

The Q library aims to have the queue logics to be extremely easy to understand, possibly with some infinitesimal performance degradation compared to near-impossible-to-reason-about queues.

The Q library is built on common building blocks and at it's core has easy to understand logic, the famous [lock-free-circular-fifo](https://kjellkod.wordpress.com/2012/11/28/c-debt-paid-in-full-wait-free-lock-free-queue/). I wrote about this in 2012 and with some minor changes it's pretty much the same root building block as then. It's nothing new about this queue, in fact there are a myriad of similar queues out there.

18 changes: 18 additions & 0 deletions benchmark/benchmark.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

set(BENCHMARK_MAIN ${Q_SOURCE_DIR}/benchmark/benchmark_main.cpp)
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release CACHE STRING "Build type (default: Release)" FORCE)
endif()

# Set compiler flags for high performance
if(CMAKE_COMPILER_IS_GNUCXX)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
endif()


file(GLOB BENCHMARK_SRC_FILES "benchmark/*.cpp")
include_directories(${PROJECT_SRC})

add_executable(benchmark ${BENCHMARK_MAIN} ${BENCHMARK_SRC_FILES})
target_link_libraries(benchmark ${Q_LIBRARY})

137 changes: 137 additions & 0 deletions benchmark/benchmark_functions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Not any company's property but Public-Domain
* Do with source-code as you will. No requirement to keep this
* header if need to use it/change it/ or do whatever with it
*
* Note that there is No guarantee that this code will work
* and I take no responsibility for this code and any problems you
* might get if using it.
*
* Originally published at https://github.com/KjellKod/Q
*/

#pragma once
#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <cassert>
#include <string>
#include <vector>
#include "q/q_api.hpp"
#include "stopwatch.hpp"

#define Q_CHECK_EQ(value1, value2) \
do { \
if ((value1) != (value2)) { \
std::cout << "Line " << __LINE__ << ": CHECK failed, " \
<< (value1) << " is not equal with " << (value2) << std::endl; \
assert((value1) == (value2)); \
} \
} while (0)

#define Q_CHECK(expr) \
do { \
if (!(expr)) { \
std::cout << "Line " << __LINE__ << ": CHECK failed, " \
<< #expr << " is not true" << std::endl; \
assert((expr)); \
} \
} while (0)

namespace benchmark {
struct result_t {
uint64_t total_sum;
uint64_t elapsed_time_in_ns;
};

const std::chrono::milliseconds kMaxWaitMs(1000);

template <typename Sender>
result_t Push(Sender q, const size_t stop, std::atomic<bool>& producerStart, std::atomic<bool>& consumerStart) {
using namespace std::chrono_literals;
producerStart.store(true);
while (!consumerStart.load()) {
std::this_thread::sleep_for(10ns);
}
benchmark::stopwatch watch;
uint64_t sum = 0;
for (unsigned int i = 1; i <= stop; ++i) {
Q_CHECK(q.wait_and_push(i, kMaxWaitMs));
sum += i;
}
return {sum, watch.elapsed_ns()};
}

template <typename Receiver>
result_t Get(Receiver q, size_t stop, std::atomic<bool>& producerStart, std::atomic<bool>& consumerStart) {
using namespace std::chrono_literals;
consumerStart.store(true);
while (!producerStart.load()) {
std::this_thread::sleep_for(10ns);
}
benchmark::stopwatch watch;
uint64_t sum = 0;
for (;;) {
unsigned int value = 0;
Q_CHECK(q.wait_and_pop(value, kMaxWaitMs));
sum += value;
if (value == stop) {
break;
}
}
return {sum, watch.elapsed_ns()};
}
} // namespace benchmark

// template <typename Sender>
// size_t PushUntil(Sender q, std::string data,
// std::atomic<size_t>& producerCount,
// std::atomic<bool>& stopRunning) {
// using namespace std::chrono_literals;
// producerCount++;
// using namespace std::chrono_literals;

// benchmark::stopwatch watch;
// size_t amountPushed = 0;
// while (!stopRunning.load(std::memory_order_relaxed)) {
// std::string value = data;
// while (false == q.push(value) && !stopRunning.load(std::memory_order_relaxed)) {
// std::this_thread::sleep_for(100ns); // yield is too aggressive
// }
// ++amountPushed;
// }
// return amountPushed;
// }

// // template <typename Receiver>
// // size_t GetUntil(Receiver q, const std::string data,
// // std::atomic<size_t>& consumerCount,
// // std::atomic<bool>& stopRunning) {
// // using namespace std::chrono_literals;
// // consumerCount++;

// // benchmark::stopwatch watch;
// // size_t amountReceived = 0;
// // size_t byteReceived = 0;
// // while (!stopRunning.load(std::memory_order_relaxed)) {
// // std::string value;
// // bool result = false;
// // std::chrono::milliseconds wait{10};
// // while (!(result = q.wait_and_pop(value, wait))) {
// // if (stopRunning.load(std::memory_order_relaxed)) {
// // break;
// // }
// // }
// // if (result) {
// // EXPECT_EQ(data.size(), value.size());
// // EXPECT_FALSE(value.was_empty());
// // ++amountReceived;
// // byteReceived += value.size();
// // }
// // }
// // std::ostringstream oss;
// // oss << "Bytes received: " << byteReceived << std::endl;
// // std::cout << oss.str();
// // return amountReceived;
// // }
122 changes: 122 additions & 0 deletions benchmark/benchmark_main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#include <iomanip>
#include <iostream>
#include "benchmark_functions.hpp"
#include "benchmark_runs.hpp"
#include "q/mpmc_lock_queue.hpp"
#include "q/q_api.hpp"
#include "q/spsc_flexible_circular_fifo.hpp"

namespace {
const size_t kGoodSizedQueueSize = (2 << 16); // 65536
const size_t kNumberOfItems = 1000000;

struct benchmark_result {
int runs;
int num_producer_threads;
int num_consumer_threads;
size_t messages_per_iteration;
double mean_msgs_per_second;
double min_msgs_per_second;
double max_msgs_per_second;
std::string comment;
};
} // namespace

void print_result(const benchmark_result& result) {
double average_call_time_ns = result.mean_msgs_per_second ? 1e9 / result.mean_msgs_per_second : 0;

std::cout << std::left << std::setw(6) << result.runs << ", "
<< std::setw(6) << result.num_producer_threads << ", "
<< std::setw(6) << result.num_consumer_threads << ", "
<< std::setw(12) << result.mean_msgs_per_second << ", "
<< std::setw(12) << result.min_msgs_per_second << ", "
<< std::setw(12) << result.max_msgs_per_second << ", "
<< std::setw(10) << average_call_time_ns << ","
<< std::setw(30) << result.comment << std::endl;
}

template <typename QueueType>
benchmark_result benchmark_queue(const std::string& comment) {
const int kRuns = 33;
std::vector<benchmark::result_t> results;
double total_duration_ns = 0.0;
double min_msgs_per_second = std::numeric_limits<double>::max();
double max_msgs_per_second = std::numeric_limits<double>::min();
double total_msgs_per_second = 0.0;

for (int i = 0; i < kRuns; ++i) {
auto queue = queue_api::CreateQueue<QueueType>(kGoodSizedQueueSize);
auto result = benchmark::runSPSC(queue, kNumberOfItems);
results.push_back(result);
total_duration_ns += result.elapsed_time_in_ns;

double duration_s = result.elapsed_time_in_ns / 1e9; // convert ns to seconds
double msgs_per_second = kNumberOfItems / duration_s; // messages per second for this run
total_msgs_per_second += msgs_per_second;
min_msgs_per_second = std::min(min_msgs_per_second, msgs_per_second);
max_msgs_per_second = std::max(max_msgs_per_second, msgs_per_second);
}

double mean_msgs_per_second = total_msgs_per_second / kRuns; // mean messages per second

benchmark_result result;
result.runs = kRuns;
result.num_producer_threads = 1;
result.num_consumer_threads = 1;
result.messages_per_iteration = kNumberOfItems;
result.mean_msgs_per_second = mean_msgs_per_second;
result.min_msgs_per_second = min_msgs_per_second;
result.max_msgs_per_second = max_msgs_per_second;
result.comment = comment;

return result;
}

int main() {
// Print the headers
std::cout << "#runs,\t#p,\t#c,\t#msgs/s,\t#min_msgs/s,\t#max_msgs/s,\tavg call [ns],\tcomment" << std::endl;

auto spsc_result = benchmark_queue<spsc::flexible::circular_fifo<unsigned int>>("SPSC benchmark");
print_result(spsc_result);

auto spsc_lockqueue_result = benchmark_queue<mpmc::lock_queue<unsigned int>>("SPSC using the lock-based MPMC benchmark");
print_result(spsc_lockqueue_result);

return 0;
}

// benchmark_result benchmark_mpmc() {
// const int kRuns = 33;
// std::vector<benchmark::result_t> results;
// double total_duration_ns = 0.0;
// double min_msgs_per_second = std::numeric_limits<double>::max();
// double max_msgs_per_second = std::numeric_limits<double>::min();
// double total_msgs_per_second = 0.0;

// for (int i = 0; i < kRuns; ++i) {
// auto queue = queue_api::CreateQueue<mpmc::lock_queue<unsigned int>>(kGoodSizedQueueSize);
// auto result = benchmark::runSPSC(queue, kNumberOfItems);
// results.push_back(result);
// total_duration_ns += result.elapsed_time_in_ns;

// double duration_s = result.elapsed_time_in_ns / 1e9; // convert ns to seconds
// double msgs_per_second = kNumberOfItems / duration_s; // messages per second for this run
// total_msgs_per_second += msgs_per_second;
// min_msgs_per_second = std::min(min_msgs_per_second, msgs_per_second);
// max_msgs_per_second = std::max(max_msgs_per_second, msgs_per_second);
// }

// double mean_msgs_per_second = total_msgs_per_second / kRuns; // mean messages per second

// benchmark_result result;
// result.runs = kRuns;
// result.num_producer_threads = 1;
// result.num_consumer_threads = 1;
// result.messages_per_iteration = kNumberOfItems;
// result.mean_msgs_per_second = mean_msgs_per_second;
// result.min_msgs_per_second = min_msgs_per_second;
// result.max_msgs_per_second = max_msgs_per_second;
// result.comment = "SPSC using the lock-based MPMC benchmark";

// return result;
// }
36 changes: 36 additions & 0 deletions benchmark/benchmark_runs.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once
#include <algorithm>
#include <atomic>
#include <chrono>
#include <future>
#include "benchmark_functions.hpp"
#include "q/q_api.hpp"

namespace benchmark {
template <typename T>
benchmark::result_t runSPSC(T queue, size_t howMany) {
std::atomic<bool> producerStart{false};
std::atomic<bool> consumerStart{false};

using namespace std;
using namespace chrono;
auto producer = std::get<queue_api::index::sender>(queue);
auto consumer = std::get<queue_api::index::receiver>(queue);

benchmark::stopwatch watch;
size_t stop = howMany;
auto prodResult = std::async(std::launch::async, benchmark::Push<decltype(producer)>,
producer, stop, std::ref(producerStart), std::ref(consumerStart));
auto consResult = std::async(std::launch::async, benchmark::Get<decltype(consumer)>,
consumer, stop, std::ref(producerStart), std::ref(consumerStart));

auto sent = prodResult.get();
auto received = consResult.get();
Q_CHECK(producer.empty());
Q_CHECK(consumer.empty());
Q_CHECK_EQ(sent.total_sum, received.total_sum);
Q_CHECK(watch.elapsed_ns() >= sent.elapsed_time_in_ns);
Q_CHECK(watch.elapsed_ns() >= received.elapsed_time_in_ns);
return {received.total_sum, std::max(sent.elapsed_time_in_ns, received.elapsed_time_in_ns)};
}
} // namespace benchmark
Empty file removed benchmark/spsc_benchmark.cpp
Empty file.
Loading

0 comments on commit 7a22fcd

Please sign in to comment.