diff --git a/.vscode/settings.json b/.vscode/settings.json index cad7657..c4b74f3 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,72 @@ { - "cmake.configureOnOpen": false + "cmake.configureOnOpen": false, + "files.associations": { + "__bit_reference": "cpp", + "__config": "cpp", + "__debug": "cpp", + "__errc": "cpp", + "__hash_table": "cpp", + "__locale": "cpp", + "__mutex_base": "cpp", + "__node_handle": "cpp", + "__split_buffer": "cpp", + "__threading_support": "cpp", + "__tree": "cpp", + "__verbose_abort": "cpp", + "any": "cpp", + "array": "cpp", + "atomic": "cpp", + "bitset": "cpp", + "cctype": "cpp", + "charconv": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "complex": "cpp", + "condition_variable": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "exception": "cpp", + "forward_list": "cpp", + "fstream": "cpp", + "future": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "ios": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "list": "cpp", + "locale": "cpp", + "map": "cpp", + "mutex": "cpp", + "new": "cpp", + "optional": "cpp", + "ostream": "cpp", + "queue": "cpp", + "ratio": "cpp", + "set": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "streambuf": "cpp", + "string": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "thread": "cpp", + "tuple": "cpp", + "typeinfo": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "variant": "cpp", + "vector": "cpp", + "algorithm": "cpp" + } } \ No newline at end of file diff --git a/benchmark/benchmark_functions.hpp b/benchmark/benchmark_functions.hpp index 60c2814..79c0583 100644 --- a/benchmark/benchmark_functions.hpp +++ b/benchmark/benchmark_functions.hpp @@ -12,10 +12,10 @@ #pragma once #include +#include #include #include #include -#include #include #include #include "q/q_api.hpp" diff --git a/benchmark/benchmark_main.cpp b/benchmark/benchmark_main.cpp index c55d8c5..60e14a1 100644 --- a/benchmark/benchmark_main.cpp +++ b/benchmark/benchmark_main.cpp @@ -24,15 +24,15 @@ namespace { void print_result(const benchmark_result& result) { double average_call_time_ns = result.mean_msgs_per_second ? 1e9 / result.mean_msgs_per_second : 0; - - std::cout << std::left << std::setw(6) << result.runs << ", " + std::cout << std::left + << std::setw(6) << result.runs << ", " << std::setw(6) << result.num_producer_threads << ", " << std::setw(6) << result.num_consumer_threads << ", " - << std::setw(12) << result.mean_msgs_per_second << ", " - << std::setw(12) << result.min_msgs_per_second << ", " - << std::setw(12) << result.max_msgs_per_second << ", " - << std::setw(10) << average_call_time_ns << "," - << std::setw(30) << result.comment << std::endl; + << std::setw(15) << std::fixed << std::setprecision(2) << result.mean_msgs_per_second << ", " + << std::setw(15) << result.min_msgs_per_second << ", " + << std::setw(15) << result.max_msgs_per_second << ", " + << std::setw(15) << std::fixed << std::setprecision(2) << average_call_time_ns << ", " + << result.comment << std::endl; } template diff --git a/benchmark/test_performance.cpp b/benchmark/test_performance.cpp index dffc08b..9384782 100644 --- a/benchmark/test_performance.cpp +++ b/benchmark/test_performance.cpp @@ -9,7 +9,7 @@ // * Originally published at: https://github.com/KjellKod/Q // */ -// #include "test_performance.hpp" +// //#include "test_performance.hpp" // #include // #include // #include @@ -38,189 +38,189 @@ // std::cout << "messages / second " << kAmount/(runner.nanotime * 1000000000) // } -// // TEST(Performance, SPSC_circular_fifo_CircularFifo) { -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // RunSPSC(queue, kAmount); -// // } - -// // TEST(Performance, SPSC_circular_fifo_CircularFifo_Smaller) { -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // RunSPSC(queue, kAmount); -// // } - -// // TEST(Performance, SPSC_Fixed_CircularFifo) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(); -// // RunSPSC(queue, kAmount); -// // } - -// // TEST(Performance, SPSC_Fixed_CircularFifo_Smaller) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(); -// // RunSPSC(queue, kAmount); -// // } - -// // TEST(Performance, MPMC_1_to_1) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(kAmount); -// // RunSPSC(queue, kAmount); -// // } - -// // TEST(Performance, MPMC_1_to_1_Smaller) { -// // using namespace std; - -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // RunSPSC(queue, kAmount); -// // } - -// // TEST(Performance, DISABLED_SPSC_circular_fifo_20secRun_LargeData) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // const size_t large = 65000; -// // std::string payload(large, 'x'); -// // EXPECT_EQ(large, payload.size()); -// // const size_t numberOfProducers = 1; -// // const size_t numberOfConsumers = 1; -// // const size_t kTimeToRunSec = 20; -// // RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_SPSC_Fixed_20secRun_LargeData) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(); -// // const size_t large = 65000; -// // std::string payload(large, 'x'); -// // EXPECT_EQ(large, payload.size()); -// // const size_t numberOfProducers = 1; -// // const size_t numberOfConsumers = 1; -// // const size_t kTimeToRunSec = 20; -// // RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_MPMC_1_to_4_20secRun_LargeData) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // const size_t large = 65000; -// // std::string payload(large, 'x'); -// // EXPECT_EQ(large, payload.size()); -// // const size_t numberOfProducers = 1; -// // const size_t numberOfConsumers = 4; -// // const size_t kTimeToRunSec = 20; -// // RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_MPMC_1_to_4_20secRun_SmallData) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // const size_t small = 10; -// // std::string payload(small, 'x'); -// // EXPECT_EQ(small, payload.size()); -// // const size_t numberOfProducers = 1; -// // const size_t numberOfConsumers = 4; -// // const size_t kTimeToRunSec = 20; -// // RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_lock_free__SPMC_1_to_4_20secRun_LargeData) { -// // using namespace std; -// // using element = std::string; -// // using qtype = spsc::circular_fifo; -// // using qtype_pair = std::tuple, queue_api::Receiver>; -// // std::vector queues; -// // for (size_t i = 0; i < 4; ++i) { -// // queues.push_back(queue_api::CreateQueue(kSmallQueueSize)); -// // } - -// // const size_t large = 65000; -// // std::string payload(large, 'x'); -// // EXPECT_EQ(large, payload.size()); -// // const size_t kTimeToRunSec = 20; -// // RunSPMC(queues, payload, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_lock_free__SPMC_1_to_4_20secRun_SmallData) { -// // using namespace std; -// // using element = std::string; -// // using qtype = spsc::circular_fifo; -// // using qtype_pair = std::tuple, queue_api::Receiver>; -// // std::vector queues; -// // for (size_t i = 0; i < 4; ++i) { -// // queues.push_back(queue_api::CreateQueue(kSmallQueueSize)); -// // } - -// // const size_t small = 10; -// // std::string payload(small, 'x'); -// // EXPECT_EQ(small, payload.size()); -// // const size_t kTimeToRunSec = 20; -// // RunSPMC(queues, payload, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_MPMC_4_to_1_20secRun_LargeData) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // const size_t large = 65000; -// // std::string payload(large, 'x'); -// // EXPECT_EQ(large, payload.size()); -// // const size_t numberOfProducers = 4; -// // const size_t numberOfConsumers = 1; -// // const size_t kTimeToRunSec = 20; -// // RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_MPMC_4_to_1_20secRun_SmallData) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // const size_t small = 10; -// // std::string payload(small, 'x'); -// // EXPECT_EQ(small, payload.size()); -// // const size_t numberOfProducers = 4; -// // const size_t numberOfConsumers = 1; -// // const size_t kTimeToRunSec = 20; -// // RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_lock_free__MPSC_4_to_1_20secRun_LargeData) { -// // using namespace std; -// // using element = std::string; -// // using qtype = spsc::circular_fifo; -// // using qtype_pair = std::tuple, queue_api::Receiver>; -// // std::vector queues; -// // for (size_t i = 0; i < 4; ++i) { -// // queues.push_back(queue_api::CreateQueue(kSmallQueueSize)); -// // } - -// // const size_t large = 65000; -// // std::string payload(large, 'x'); -// // EXPECT_EQ(large, payload.size()); -// // const size_t kTimeToRunSec = 20; -// // RunMPSC(queues, payload, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_lock_free__MPSC_4_to_1_20secRun_SmallData) { -// // using namespace std; -// // using element = std::string; -// // using qtype = spsc::circular_fifo; -// // using qtype_pair = std::tuple, queue_api::Receiver>; -// // std::vector queues; -// // for (size_t i = 0; i < 4; ++i) { -// // queues.push_back(queue_api::CreateQueue(kSmallQueueSize)); -// // } - -// // const size_t small = 10; -// // std::string payload(small, 'x'); -// // EXPECT_EQ(small, payload.size()); -// // const size_t kTimeToRunSec = 20; -// // RunMPSC(queues, payload, kTimeToRunSec); -// // } - -// // TEST(Performance, DISABLED_MPMC_4_to_4_20secRun_LargeData) { -// // using namespace std; -// // auto queue = queue_api::CreateQueue>(kSmallQueueSize); -// // const size_t large = 65000; -// // std::string payload(large, 'x'); -// // EXPECT_EQ(large, payload.size()); -// // const size_t numberOfProducers = 4; -// // const size_t numberOfConsumers = 4; -// // const size_t kTimeToRunSec = 20; -// // RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); -// // } +// TEST(Performance, SPSC_circular_fifo_CircularFifo) { +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// RunSPSC(queue, kAmount); +// } + +// TEST(Performance, SPSC_circular_fifo_CircularFifo_Smaller) { +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// RunSPSC(queue, kAmount); +// } + +// TEST(Performance, SPSC_Fixed_CircularFifo) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(); +// RunSPSC(queue, kAmount); +// } + +// TEST(Performance, SPSC_Fixed_CircularFifo_Smaller) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(); +// RunSPSC(queue, kAmount); +// } + +// TEST(Performance, MPMC_1_to_1) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(kAmount); +// RunSPSC(queue, kAmount); +// } + +// TEST(Performance, MPMC_1_to_1_Smaller) { +// using namespace std; + +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// RunSPSC(queue, kAmount); +// } + +// TEST(Performance, DISABLED_SPSC_circular_fifo_20secRun_LargeData) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// const size_t large = 65000; +// std::string payload(large, 'x'); +// EXPECT_EQ(large, payload.size()); +// const size_t numberOfProducers = 1; +// const size_t numberOfConsumers = 1; +// const size_t kTimeToRunSec = 20; +// RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_SPSC_Fixed_20secRun_LargeData) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(); +// const size_t large = 65000; +// std::string payload(large, 'x'); +// EXPECT_EQ(large, payload.size()); +// const size_t numberOfProducers = 1; +// const size_t numberOfConsumers = 1; +// const size_t kTimeToRunSec = 20; +// RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_MPMC_1_to_4_20secRun_LargeData) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// const size_t large = 65000; +// std::string payload(large, 'x'); +// EXPECT_EQ(large, payload.size()); +// const size_t numberOfProducers = 1; +// const size_t numberOfConsumers = 4; +// const size_t kTimeToRunSec = 20; +// RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_MPMC_1_to_4_20secRun_SmallData) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// const size_t small = 10; +// std::string payload(small, 'x'); +// EXPECT_EQ(small, payload.size()); +// const size_t numberOfProducers = 1; +// const size_t numberOfConsumers = 4; +// const size_t kTimeToRunSec = 20; +// RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_lock_free__SPMC_1_to_4_20secRun_LargeData) { +// using namespace std; +// using element = std::string; +// using qtype = spsc::circular_fifo; +// using qtype_pair = std::tuple, queue_api::Receiver>; +// std::vector queues; +// for (size_t i = 0; i < 4; ++i) { +// queues.push_back(queue_api::CreateQueue(kSmallQueueSize)); +// } + +// const size_t large = 65000; +// std::string payload(large, 'x'); +// EXPECT_EQ(large, payload.size()); +// const size_t kTimeToRunSec = 20; +// RunSPMC(queues, payload, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_lock_free__SPMC_1_to_4_20secRun_SmallData) { +// using namespace std; +// using element = std::string; +// using qtype = spsc::circular_fifo; +// using qtype_pair = std::tuple, queue_api::Receiver>; +// std::vector queues; +// for (size_t i = 0; i < 4; ++i) { +// queues.push_back(queue_api::CreateQueue(kSmallQueueSize)); +// } + +// const size_t small = 10; +// std::string payload(small, 'x'); +// EXPECT_EQ(small, payload.size()); +// const size_t kTimeToRunSec = 20; +// RunSPMC(queues, payload, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_MPMC_4_to_1_20secRun_LargeData) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// const size_t large = 65000; +// std::string payload(large, 'x'); +// EXPECT_EQ(large, payload.size()); +// const size_t numberOfProducers = 4; +// const size_t numberOfConsumers = 1; +// const size_t kTimeToRunSec = 20; +// RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_MPMC_4_to_1_20secRun_SmallData) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// const size_t small = 10; +// std::string payload(small, 'x'); +// EXPECT_EQ(small, payload.size()); +// const size_t numberOfProducers = 4; +// const size_t numberOfConsumers = 1; +// const size_t kTimeToRunSec = 20; +// RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_lock_free__MPSC_4_to_1_20secRun_LargeData) { +// using namespace std; +// using element = std::string; +// using qtype = spsc::circular_fifo; +// using qtype_pair = std::tuple, queue_api::Receiver>; +// std::vector queues; +// for (size_t i = 0; i < 4; ++i) { +// queues.push_back(queue_api::CreateQueue(kSmallQueueSize)); +// } + +// const size_t large = 65000; +// std::string payload(large, 'x'); +// EXPECT_EQ(large, payload.size()); +// const size_t kTimeToRunSec = 20; +// RunMPSC(queues, payload, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_lock_free__MPSC_4_to_1_20secRun_SmallData) { +// using namespace std; +// using element = std::string; +// using qtype = spsc::circular_fifo; +// using qtype_pair = std::tuple, queue_api::Receiver>; +// std::vector queues; +// for (size_t i = 0; i < 4; ++i) { +// queues.push_back(queue_api::CreateQueue(kSmallQueueSize)); +// } + +// const size_t small = 10; +// std::string payload(small, 'x'); +// EXPECT_EQ(small, payload.size()); +// const size_t kTimeToRunSec = 20; +// RunMPSC(queues, payload, kTimeToRunSec); +// } + +// TEST(Performance, DISABLED_MPMC_4_to_4_20secRun_LargeData) { +// using namespace std; +// auto queue = queue_api::CreateQueue>(kSmallQueueSize); +// const size_t large = 65000; +// std::string payload(large, 'x'); +// EXPECT_EQ(large, payload.size()); +// const size_t numberOfProducers = 4; +// const size_t numberOfConsumers = 4; +// const size_t kTimeToRunSec = 20; +// RunMPMC(queue, payload, numberOfProducers, numberOfConsumers, kTimeToRunSec); +// } diff --git a/benchmark/test_performance.hpp b/benchmark/test_performance.hpp index 26fefdb..0b49367 100644 --- a/benchmark/test_performance.hpp +++ b/benchmark/test_performance.hpp @@ -1,16 +1,16 @@ -/* -* Not any company's property but Public-Domain -* Do with source-code as you will. No requirement to keep this -* header if need to use it/change it/ or do whatever with it -* -* Note that there is No guarantee that this code will work -* and I take no responsibility for this code and any problems you -* might get if using it. -* -* Originally published at https://github.com/KjellKod/Q -*/ - -#pragma once +// /* +// * Not any company's property but Public-Domain +// * Do with source-code as you will. No requirement to keep this +// * header if need to use it/change it/ or do whatever with it +// * +// * Note that there is No guarantee that this code will work +// * and I take no responsibility for this code and any problems you +// * might get if using it. +// * +// * Originally published at https://github.com/KjellKod/Q +// */ + +// #pragma once // #include // #include // #include @@ -21,7 +21,7 @@ // #include // #include // #include -// #include "test_helper.hpp" +// //#include "test_helper.hpp" // namespace test_performance { // using benchmark::result_t = std::vector; @@ -121,252 +121,252 @@ // return amountReceived; // } -// // template -// // std::vector RunSPS2C(T queue, size_t howMany) { -// // std::atomic producerStart{false}; -// // std::atomic consumerStart{false}; -// // std::vector result; - -// // using namespace std; -// // using namespace chrono; -// // auto producer = std::get(queue); -// // auto consumer = std::get(queue); - -// // benchmark::stopwatch watch; -// // size_t start = 1; -// // size_t stop = howMany; -// // auto prodResult = std::async(std::launch::async, Push, -// // producer, start, stop, std::ref(producerStart), std::ref(consumerStart)); -// // auto consResult = std::async(std::launch::async, Get, -// // consumer, start, stop, std::ref(producerStart), std::ref(consumerStart)); - -// // auto sent = prodResult.get(); -// // auto received = consResult.get(); -// // benchmark::result_t expected = {watch.ElapsedNs(), 0 }; -// // return {sent, received, expected}; -// // // auto t2 = high_resolution_clock::now(); -// // // auto us = duration_cast(t2 - t1).count(); -// // // std::cout << "Push - Pull #" << howMany << " items in: " << us << " us" << std::endl; -// // // std::cout << "Average: " << 1000 * ((float)us / (float)howMany) << " ns" << std::endl; -// // } - -// // template -// // void RunSPSC(T queue, size_t howMany) { -// // std::atomic producerStart{false}; -// // std::atomic consumerStart{false}; - -// // using namespace std; -// // using namespace chrono; -// // auto producer = std::get(queue); -// // auto consumer = std::get(queue); - -// // auto t1 = high_resolution_clock::now(); -// // size_t start = 1; -// // size_t stop = howMany; -// // auto prodResult = std::async(std::launch::async, Push, -// // producer, start, stop, std::ref(producerStart), std::ref(consumerStart)); -// // auto consResult = std::async(std::launch::async, Get, -// // consumer, start, stop, std::ref(producerStart), std::ref(consumerStart)); - -// // auto expected = prodResult.get(); -// // auto received = consResult.get(); -// // auto t2 = high_resolution_clock::now(); -// // auto us = duration_cast(t2 - t1).count(); -// // std::cout << "Push - Pull #" << howMany << " items in: " << us << " us" << std::endl; -// // std::cout << "Average: " << 1000 * ((float)us / (float)howMany) << " ns" << std::endl; - -// // EXPECT_EQ(howMany, received.size()); -// // EXPECT_EQ(expected, received); -// // } - -// // template -// // void RunMPMC(T queue, std::string data, size_t numberProducers, -// // size_t numberConsumers, const size_t timeToRunInSec) { -// // std::atomic producerCount{0}; -// // std::atomic consumerCount{0}; -// // std::atomic producerStop{false}; -// // std::atomic consumerStop{false}; - -// // using namespace std; -// // using namespace std::chrono; -// // auto producer = std::get(queue); -// // auto consumer = std::get(queue); -// // std::vector> producerResult; -// // producerResult.reserve(numberProducers); - -// // for (size_t i = 0; i < numberProducers; ++i) { -// // producerResult.emplace_back(std::async(std::launch::async, PushUntil, -// // producer, data, -// // std::ref(producerCount), -// // std::ref(producerStop))); -// // } -// // std::vector> consumerResult; -// // consumerResult.reserve(numberConsumers); -// // for (size_t i = 0; i < numberConsumers; ++i) { -// // consumerResult.emplace_back(std::async(std::launch::async, GetUntil, -// // consumer, data, -// // std::ref(consumerCount), -// // std::ref(consumerStop))); -// // } - -// // using namespace std::chrono_literals; -// // while (consumerCount.load() < numberConsumers && producerCount.load() < numberProducers) { -// // std::this_thread::sleep_for(1us); -// // } -// // benchmark::stopwatch elapsedRun; -// // while (elapsedRun.elapsed_sec() < timeToRunInSec) { -// // std::this_thread::sleep_for(1us); -// // } - -// // producerStop.store(true, std::memory_order_release); -// // size_t amountProduced = 0; -// // for (auto& result : producerResult) { -// // amountProduced += result.get(); -// // } -// // consumerStop.store(true, std::memory_order_release); -// // size_t amountConsumed = 0; -// // for (auto& result : consumerResult) { -// // amountConsumed += result.get(); -// // } - -// // // amoundProduced >= amountConsumed -// // // amountProduced <= amountConsumed + 100 -// // EXPECT_GE(amountProduced, amountConsumed) << "produced: " << amountProduced -// // << ", consumed: " << amountConsumed << ", capacity: " << producer.capacity(); - -// // auto elapsedTimeNs = elapsedRun.ElapsedNs(); -// // auto elapsedTimeSec = elapsedTimeNs / (1000000000); -// // std::cout << "Transaction/s: " << amountConsumed / elapsedTimeSec << std::endl; -// // std::cout << "Average transaction: " << elapsedTimeNs / amountConsumed << " ns" << std::endl; -// // std::cout << "Transaction/s per consumer: " << amountConsumed / elapsedTimeSec / numberConsumers << std::endl; -// // std::cout << "Transation GByte/s: " << amountConsumed * data.size() / (1024 * 1024 * 1024) / elapsedTimeSec << std::endl; -// // } - -// // template -// // void RunMPSC(std::vector queues, std::string data, const size_t timeToRunInSec) { -// // std::atomic producerCount{0}; -// // std::atomic consumerCount{0}; -// // std::atomic producerStop{false}; -// // std::atomic consumerStop{false}; - -// // std::vector> receivers; -// // std::vector> senders; - -// // for (auto& q : queues) { -// // receivers.push_back(std::get(q)); -// // senders.push_back(std::get(q)); -// // } -// // const size_t numberProducers = senders.size(); -// // mpsc::round_robin::Receiver consumer(receivers); - -// // std::vector> producerResult; -// // producerResult.reserve(senders.size()); - -// // for (size_t i = 0; i < senders.size(); ++i) { -// // auto producer = senders[i]; -// // producerResult.emplace_back(std::async(std::launch::async, PushUntil, senders[i], data, -// // std::ref(producerCount), std::ref(producerStop))); -// // } -// // std::vector> consumerResult; -// // const size_t numberConsumers = 1; -// // consumerResult.reserve(numberConsumers); -// // consumerResult.emplace_back(std::async(std::launch::async, GetUntil, consumer, data, -// // std::ref(consumerCount), std::ref(consumerStop))); - -// // using namespace std::chrono_literals; -// // while (consumerCount.load() < numberConsumers && producerCount.load() < numberProducers) { -// // std::this_thread::sleep_for(1us); -// // } -// // benchmark::stopwatch elapsedRun; -// // while (elapsedRun.elapsed_sec() < timeToRunInSec) { -// // std::this_thread::sleep_for(1us); -// // } - -// // producerStop.store(true, std::memory_order_release); -// // size_t amountProduced = 0; -// // for (auto& result : producerResult) { -// // amountProduced += result.get(); -// // } -// // consumerStop.store(true, std::memory_order_release); -// // size_t amountConsumed = 0; -// // for (auto& result : consumerResult) { -// // amountConsumed += result.get(); -// // } - -// // // amoundProduced >= amountConsumed -// // // amountProduced <= amountConsumed + 100 -// // EXPECT_GE(amountProduced, amountConsumed) << "produced: " << amountProduced << ", consumed: " << amountConsumed << ", capacity: " << consumer.capacity(); - -// // auto elapsedTimeNs = elapsedRun.ElapsedNs(); -// // auto elapsedTimeSec = elapsedTimeNs / (1000000000); -// // std::cout << "Transaction/s: " << amountConsumed / elapsedTimeSec << std::endl; -// // std::cout << "Average transaction: " << elapsedTimeNs / amountConsumed << " ns" << std::endl; - -// // std::cout << "Transaction/s per consumer: " << amountConsumed / elapsedTimeSec / numberConsumers << std::endl; -// // std::cout << "Transation GByte/s: " << amountConsumed * data.size() / (1024 * 1024 * 1024) / elapsedTimeSec << std::endl; -// // } - -// // template -// // void RunSPMC(std::vector queues, std::string data, const size_t timeToRunInSec) { -// // std::atomic producerCount{0}; -// // std::atomic consumerCount{0}; -// // std::atomic producerStop{false}; -// // std::atomic consumerStop{false}; - -// // std::vector> receivers; -// // std::vector> senders; - -// // for (auto& q : queues) { -// // receivers.push_back(std::get(q)); -// // senders.push_back(std::get(q)); -// // } -// // const size_t numberConsumers = senders.size(); -// // spmc::round_robin::Sender producer(senders); -// // std::vector> producerResult; -// // const size_t numberProducers = 1; -// // producerResult.reserve(numberProducers); -// // producerResult.emplace_back(std::async(std::launch::async, PushUntil, producer, data, -// // std::ref(producerCount), std::ref(producerStop))); - -// // std::vector> consumerResult; -// // consumerResult.reserve(receivers.size()); -// // for (size_t i = 0; i < receivers.size(); ++i) { -// // auto consumer = receivers[i]; -// // consumerResult.emplace_back(std::async(std::launch::async, GetUntil, receivers[i], data, -// // std::ref(consumerCount), std::ref(consumerStop))); -// // } - -// // using namespace std::chrono_literals; -// // while (consumerCount.load() < numberConsumers && producerCount.load() < numberProducers) { -// // std::this_thread::sleep_for(1us); -// // } -// // benchmark::stopwatch elapsedRun; -// // while (elapsedRun.elapsed_sec() < timeToRunInSec) { -// // std::this_thread::sleep_for(1us); -// // } - -// // producerStop.store(true, std::memory_order_release); -// // size_t amountProduced = 0; -// // for (auto& result : producerResult) { -// // amountProduced += result.get(); -// // } -// // consumerStop.store(true, std::memory_order_release); -// // size_t amountConsumed = 0; -// // for (auto& result : consumerResult) { -// // amountConsumed += result.get(); -// // } - -// // // amoundProduced >= amountConsumed -// // // amountProduced <= amountConsumed + 100 -// // EXPECT_GE(amountProduced, amountConsumed) << "produced: " << amountProduced << ", consumed: " << amountConsumed << ", capacity: " << producer.capacity(); - -// // auto elapsedTimeNs = elapsedRun.ElapsedNs(); -// // auto elapsedTimeSec = elapsedTimeNs / (1000000000); -// // std::cout << "Transaction/s: " << amountConsumed / elapsedTimeSec << std::endl; -// // std::cout << "Average transaction: " << elapsedTimeNs / amountConsumed << " ns" << std::endl; - -// // std::cout << "Transaction/s per consumer: " << amountConsumed / elapsedTimeSec / numberConsumers << std::endl; -// // std::cout << "Transation GByte/s: " << amountConsumed * data.size() / (1024 * 1024 * 1024) / elapsedTimeSec << std::endl; -// // } - -// // } // namespace test_performance \ No newline at end of file +// template +// std::vector RunSPS2C(T queue, size_t howMany) { +// std::atomic producerStart{false}; +// std::atomic consumerStart{false}; +// std::vector result; + +// using namespace std; +// using namespace chrono; +// auto producer = std::get(queue); +// auto consumer = std::get(queue); + +// benchmark::stopwatch watch; +// size_t start = 1; +// size_t stop = howMany; +// auto prodResult = std::async(std::launch::async, Push, +// producer, start, stop, std::ref(producerStart), std::ref(consumerStart)); +// auto consResult = std::async(std::launch::async, Get, +// consumer, start, stop, std::ref(producerStart), std::ref(consumerStart)); + +// auto sent = prodResult.get(); +// auto received = consResult.get(); +// benchmark::result_t expected = {watch.ElapsedNs(), 0}; +// return {sent, received, expected}; +// // auto t2 = high_resolution_clock::now(); +// // auto us = duration_cast(t2 - t1).count(); +// // std::cout << "Push - Pull #" << howMany << " items in: " << us << " us" << std::endl; +// // std::cout << "Average: " << 1000 * ((float)us / (float)howMany) << " ns" << std::endl; +// } + +// template +// void RunSPSC(T queue, size_t howMany) { +// std::atomic producerStart{false}; +// std::atomic consumerStart{false}; + +// using namespace std; +// using namespace chrono; +// auto producer = std::get(queue); +// auto consumer = std::get(queue); + +// auto t1 = high_resolution_clock::now(); +// size_t start = 1; +// size_t stop = howMany; +// auto prodResult = std::async(std::launch::async, Push, +// producer, start, stop, std::ref(producerStart), std::ref(consumerStart)); +// auto consResult = std::async(std::launch::async, Get, +// consumer, start, stop, std::ref(producerStart), std::ref(consumerStart)); + +// auto expected = prodResult.get(); +// auto received = consResult.get(); +// auto t2 = high_resolution_clock::now(); +// auto us = duration_cast(t2 - t1).count(); +// std::cout << "Push - Pull #" << howMany << " items in: " << us << " us" << std::endl; +// std::cout << "Average: " << 1000 * ((float)us / (float)howMany) << " ns" << std::endl; + +// EXPECT_EQ(howMany, received.size()); +// EXPECT_EQ(expected, received); +// } + +// template +// void RunMPMC(T queue, std::string data, size_t numberProducers, +// size_t numberConsumers, const size_t timeToRunInSec) { +// std::atomic producerCount{0}; +// std::atomic consumerCount{0}; +// std::atomic producerStop{false}; +// std::atomic consumerStop{false}; + +// using namespace std; +// using namespace std::chrono; +// auto producer = std::get(queue); +// auto consumer = std::get(queue); +// std::vector> producerResult; +// producerResult.reserve(numberProducers); + +// for (size_t i = 0; i < numberProducers; ++i) { +// producerResult.emplace_back(std::async(std::launch::async, PushUntil, +// producer, data, +// std::ref(producerCount), +// std::ref(producerStop))); +// } +// std::vector> consumerResult; +// consumerResult.reserve(numberConsumers); +// for (size_t i = 0; i < numberConsumers; ++i) { +// consumerResult.emplace_back(std::async(std::launch::async, GetUntil, +// consumer, data, +// std::ref(consumerCount), +// std::ref(consumerStop))); +// } + +// using namespace std::chrono_literals; +// while (consumerCount.load() < numberConsumers && producerCount.load() < numberProducers) { +// std::this_thread::sleep_for(1us); +// } +// benchmark::stopwatch elapsedRun; +// while (elapsedRun.elapsed_sec() < timeToRunInSec) { +// std::this_thread::sleep_for(1us); +// } + +// producerStop.store(true, std::memory_order_release); +// size_t amountProduced = 0; +// for (auto& result : producerResult) { +// amountProduced += result.get(); +// } +// consumerStop.store(true, std::memory_order_release); +// size_t amountConsumed = 0; +// for (auto& result : consumerResult) { +// amountConsumed += result.get(); +// } + +// // amoundProduced >= amountConsumed +// // amountProduced <= amountConsumed + 100 +// EXPECT_GE(amountProduced, amountConsumed) << "produced: " << amountProduced +// << ", consumed: " << amountConsumed << ", capacity: " << producer.capacity(); + +// auto elapsedTimeNs = elapsedRun.ElapsedNs(); +// auto elapsedTimeSec = elapsedTimeNs / (1000000000); +// std::cout << "Transaction/s: " << amountConsumed / elapsedTimeSec << std::endl; +// std::cout << "Average transaction: " << elapsedTimeNs / amountConsumed << " ns" << std::endl; +// std::cout << "Transaction/s per consumer: " << amountConsumed / elapsedTimeSec / numberConsumers << std::endl; +// std::cout << "Transation GByte/s: " << amountConsumed * data.size() / (1024 * 1024 * 1024) / elapsedTimeSec << std::endl; +// } + +// template +// void RunMPSC(std::vector queues, std::string data, const size_t timeToRunInSec) { +// std::atomic producerCount{0}; +// std::atomic consumerCount{0}; +// std::atomic producerStop{false}; +// std::atomic consumerStop{false}; + +// std::vector> receivers; +// std::vector> senders; + +// for (auto& q : queues) { +// receivers.push_back(std::get(q)); +// senders.push_back(std::get(q)); +// } +// const size_t numberProducers = senders.size(); +// mpsc::fixed_size::round_robin::Receiver consumer(receivers); + +// std::vector> producerResult; +// producerResult.reserve(senders.size()); + +// for (size_t i = 0; i < senders.size(); ++i) { +// auto producer = senders[i]; +// producerResult.emplace_back(std::async(std::launch::async, PushUntil, senders[i], data, +// std::ref(producerCount), std::ref(producerStop))); +// } +// std::vector> consumerResult; +// const size_t numberConsumers = 1; +// consumerResult.reserve(numberConsumers); +// consumerResult.emplace_back(std::async(std::launch::async, GetUntil, consumer, data, +// std::ref(consumerCount), std::ref(consumerStop))); + +// using namespace std::chrono_literals; +// while (consumerCount.load() < numberConsumers && producerCount.load() < numberProducers) { +// std::this_thread::sleep_for(1us); +// } +// benchmark::stopwatch elapsedRun; +// while (elapsedRun.elapsed_sec() < timeToRunInSec) { +// std::this_thread::sleep_for(1us); +// } + +// producerStop.store(true, std::memory_order_release); +// size_t amountProduced = 0; +// for (auto& result : producerResult) { +// amountProduced += result.get(); +// } +// consumerStop.store(true, std::memory_order_release); +// size_t amountConsumed = 0; +// for (auto& result : consumerResult) { +// amountConsumed += result.get(); +// } + +// // amoundProduced >= amountConsumed +// // amountProduced <= amountConsumed + 100 +// EXPECT_GE(amountProduced, amountConsumed) << "produced: " << amountProduced << ", consumed: " << amountConsumed << ", capacity: " << consumer.capacity(); + +// auto elapsedTimeNs = elapsedRun.ElapsedNs(); +// auto elapsedTimeSec = elapsedTimeNs / (1000000000); +// std::cout << "Transaction/s: " << amountConsumed / elapsedTimeSec << std::endl; +// std::cout << "Average transaction: " << elapsedTimeNs / amountConsumed << " ns" << std::endl; + +// std::cout << "Transaction/s per consumer: " << amountConsumed / elapsedTimeSec / numberConsumers << std::endl; +// std::cout << "Transation GByte/s: " << amountConsumed * data.size() / (1024 * 1024 * 1024) / elapsedTimeSec << std::endl; +// } + +// template +// void RunSPMC(std::vector queues, std::string data, const size_t timeToRunInSec) { +// std::atomic producerCount{0}; +// std::atomic consumerCount{0}; +// std::atomic producerStop{false}; +// std::atomic consumerStop{false}; + +// std::vector> receivers; +// std::vector> senders; + +// for (auto& q : queues) { +// receivers.push_back(std::get(q)); +// senders.push_back(std::get(q)); +// } +// const size_t numberConsumers = senders.size(); +// spmc::fixed_size::round_robin::Sender producer(senders); +// std::vector> producerResult; +// const size_t numberProducers = 1; +// producerResult.reserve(numberProducers); +// producerResult.emplace_back(std::async(std::launch::async, PushUntil, producer, data, +// std::ref(producerCount), std::ref(producerStop))); + +// std::vector> consumerResult; +// consumerResult.reserve(receivers.size()); +// for (size_t i = 0; i < receivers.size(); ++i) { +// auto consumer = receivers[i]; +// consumerResult.emplace_back(std::async(std::launch::async, GetUntil, receivers[i], data, +// std::ref(consumerCount), std::ref(consumerStop))); +// } + +// using namespace std::chrono_literals; +// while (consumerCount.load() < numberConsumers && producerCount.load() < numberProducers) { +// std::this_thread::sleep_for(1us); +// } +// benchmark::stopwatch elapsedRun; +// while (elapsedRun.elapsed_sec() < timeToRunInSec) { +// std::this_thread::sleep_for(1us); +// } + +// producerStop.store(true, std::memory_order_release); +// size_t amountProduced = 0; +// for (auto& result : producerResult) { +// amountProduced += result.get(); +// } +// consumerStop.store(true, std::memory_order_release); +// size_t amountConsumed = 0; +// for (auto& result : consumerResult) { +// amountConsumed += result.get(); +// } + +// // amoundProduced >= amountConsumed +// // amountProduced <= amountConsumed + 100 +// EXPECT_GE(amountProduced, amountConsumed) << "produced: " << amountProduced << ", consumed: " << amountConsumed << ", capacity: " << producer.capacity(); + +// auto elapsedTimeNs = elapsedRun.ElapsedNs(); +// auto elapsedTimeSec = elapsedTimeNs / (1000000000); +// std::cout << "Transaction/s: " << amountConsumed / elapsedTimeSec << std::endl; +// std::cout << "Average transaction: " << elapsedTimeNs / amountConsumed << " ns" << std::endl; + +// std::cout << "Transaction/s per consumer: " << amountConsumed / elapsedTimeSec / numberConsumers << std::endl; +// std::cout << "Transation GByte/s: " << amountConsumed * data.size() / (1024 * 1024 * 1024) / elapsedTimeSec << std::endl; +// } + +// } // namespace test_performance \ No newline at end of file diff --git a/src/q/mpsc_receiver_round_robin.hpp b/src/q/mpsc_receiver_round_robin.hpp index 24aed0d..3f2f8ca 100644 --- a/src/q/mpsc_receiver_round_robin.hpp +++ b/src/q/mpsc_receiver_round_robin.hpp @@ -33,70 +33,73 @@ // MPSC : Many Single Producers - Single Consumer namespace mpsc { - namespace round_robin { - // Use case: Many producers, one consumer. - // Instead of using a MPMC queue you can use the MPSC - // (Many Single Producer - One Single Consumer) queue. - // - // The Consumer will round-robin fetch attempts in a fair scheduling policy - // the lock-free base type will typcially make this a much faster choice than using the mutex - // protected MPSC - // - // WARNING: The same constraints as SPSC are in place for this queue. Only ONE thread may - // act as the consumer - template - class Receiver : public ::round_robin::API> { - public: - using QueueAPI = ::round_robin::API>; + namespace fixed_size { + namespace round_robin { + // Use case: Many producers, one consumer. + // Instead of using a MPMC queue you can use the MPSC + // (Many Single Producer - One Single Consumer) queue. + // + // The Consumer will round-robin fetch attempts in a fair scheduling policy + // the lock-free base type will typcially make this a much faster choice than using the mutex + // protected MPSC + // + // WARNING: The same constraints as SPSC are in place for this queue. Only ONE thread may + // act as the consumer + template + class Receiver : public ::round_robin::API> { + public: + using QueueAPI = ::round_robin::API>; - Receiver(std::vector> Receivers); - virtual ~Receiver() = default; + /// mpsc - by using vector of producer queues - receivers,that the single consumer will pop from in round-robin manner + Receiver(std::vector> Receivers); + virtual ~Receiver() = default; - template - bool pop(Element& item); + template + bool pop(Element& item); - template - bool wait_and_pop(Element& item, const std::chrono::milliseconds wait_ms); - }; + template + bool wait_and_pop(Element& item, const std::chrono::milliseconds wait_ms); + }; - template - Receiver::Receiver(std::vector> receivers) : - QueueAPI(receivers) {} + template + Receiver::Receiver(std::vector> receivers) : + QueueAPI(receivers) {} - template - template - bool Receiver::pop(Element& item) { - bool result = false; - const size_t loop_check = QueueAPI::queues_.size(); + template + template + bool Receiver::pop(Element& item) { + bool result = false; + const size_t loop_check = QueueAPI::queues_.size(); - size_t count = 0; - while (!result && count++ < loop_check) { - result = QueueAPI::queues_[QueueAPI::current_].pop(item); - QueueAPI::current_ = QueueAPI::increment(QueueAPI::current_); + size_t count = 0; + while (!result && count++ < loop_check) { + result = QueueAPI::queues_[QueueAPI::current_].pop(item); + QueueAPI::current_ = QueueAPI::increment(QueueAPI::current_); + } + return result; } - return result; - } - template - template - bool Receiver::wait_and_pop(Element& item, const std::chrono::milliseconds max_wait) { - using milliseconds = std::chrono::milliseconds; - using clock = std::chrono::steady_clock; - using namespace std::chrono_literals; - auto t1 = clock::now(); - bool result = false; - const size_t wrap = QueueAPI::current_; - while (!result) { - result = pop(item); - auto elapsed_ms = std::chrono::duration_cast(clock::now() - t1); - if (elapsed_ms > max_wait) { - break; - } - if (false == result && wrap == QueueAPI::current_) { - std::this_thread::sleep_for(100ns); + template + template + bool Receiver::wait_and_pop(Element& item, const std::chrono::milliseconds max_wait) { + using milliseconds = std::chrono::milliseconds; + using clock = std::chrono::steady_clock; + using namespace std::chrono_literals; + auto t1 = clock::now(); + bool result = false; + const size_t wrap = QueueAPI::current_; + while (!result) { + result = pop(item); + auto elapsed_ms = std::chrono::duration_cast(clock::now() - t1); + if (elapsed_ms > max_wait) { + break; + } + if (false == result && wrap == QueueAPI::current_) { + std::this_thread::sleep_for(100ns); + } } + return result; } - return result; - } - } // namespace round_robin + } // namespace round_robin + } // namespace fixed_size } // namespace mpsc diff --git a/src/q/spmc_sender_round_robin.hpp b/src/q/spmc_sender_round_robin.hpp index c1d0da5..580afa6 100644 --- a/src/q/spmc_sender_round_robin.hpp +++ b/src/q/spmc_sender_round_robin.hpp @@ -34,45 +34,48 @@ // MPSC : Many Single Producers - Single Consumer namespace spmc { - namespace round_robin { - // Use case: One Producer Many Consumers - // Instead of using a MPMC queue you can use the SPMC - // (One Single Consumer) - Many Single Producer (spsc queues)- - // - // The Producer will round-robin fetch attempts in a fair scheduling policy - // the lock-free base type will typcially make this a much faster choice than using the mutex - // protected MPSC - // - // WARNING: The same constraints as SPSC are in place for this queue. Only ONE thread may - // act as the producer - template - class Sender : public ::round_robin::API> { - public: - using QueueAPI = ::round_robin::API>; + namespace fixed_size { + namespace round_robin { + // Use case: One Producer Many Consumers + // Instead of using a MPMC queue you can use the SPMC + // (One Single Consumer) - Many Single Producer (spsc queues)- + // + // The Producer will round-robin fetch attempts in a fair scheduling policy + // the lock-free base type will typcially make this a much faster choice than using the mutex + // protected MPSC + // + // WARNING: The same constraints as SPSC are in place for this queue. Only ONE thread may + // act as the producer + template + class Sender : public ::round_robin::API> { + public: + using QueueAPI = ::round_robin::API>; - Sender(std::vector> senders); - virtual ~Sender() = default; + /// spmc - by using vector of consumer queues, the producer will round robin each push to fair scheduling for the multiplc consumers + Sender(std::vector> senders); + virtual ~Sender() = default; - template - bool push(Element& item); - }; + template + bool push(Element& item); + }; - template - Sender::Sender(std::vector> senders) : - QueueAPI(senders) {} + template + Sender::Sender(std::vector> senders) : + QueueAPI(senders) {} - template - template - bool Sender::push(Element& item) { - bool result = false; - const size_t loop_check = QueueAPI::queues_.size(); + template + template + bool Sender::push(Element& item) { + bool result = false; + const size_t loop_check = QueueAPI::queues_.size(); - size_t count = 0; - while (!result && count++ < loop_check) { - result = QueueAPI::queues_[QueueAPI::current_].push(item); - QueueAPI::current_ = QueueAPI::increment(QueueAPI::current_); + size_t count = 0; + while (!result && count++ < loop_check) { + result = QueueAPI::queues_[QueueAPI::current_].push(item); + QueueAPI::current_ = QueueAPI::increment(QueueAPI::current_); + } + return result; } - return result; - } - } // namespace round_robin + } // namespace round_robin + } // namespace fixed_size } // namespace spmc diff --git a/src/q/spsc_circular_fifo.hpp b/src/q/spsc_circular_fifo.hpp index 8e02690..42d743d 100644 --- a/src/q/spsc_circular_fifo.hpp +++ b/src/q/spsc_circular_fifo.hpp @@ -30,111 +30,111 @@ #include namespace spsc { - template - class circular_fifo { - public: - explicit circular_fifo(const size_t size) : - kSize(size), - kCapacity(kSize + 1), - array_(kCapacity), - tail_(0), - head_(0) { - } - - virtual ~circular_fifo() {} - - bool push(Element& item); - bool pop(Element& item); - bool empty() const; - bool full() const; - size_t capacity() const; - size_t capacity_free() const; - size_t usage() const; - size_t size() const; - bool lock_free() const; - size_t tail() const { return tail_.load(); } - size_t head() const { return head_.load(); } - - private: - typedef char cache_line[64]; - size_t increment(size_t idx) const { return (idx + 1) % kCapacity; } - const size_t kSize; - const size_t kCapacity; - - cache_line pad_storage_; - std::vector array_; - - cache_line padtail_; - std::atomic tail_; - cache_line padhead_; - std::atomic head_; // head(output) index - cache_line padend_; - }; - - template - bool circular_fifo::push(Element& item) { - const auto currenttail_ = tail_.load(std::memory_order_relaxed); - const auto nexttail_ = increment(currenttail_); - - if (nexttail_ != head_.load(std::memory_order_acquire)) { - array_[currenttail_] = std::move(item); - tail_.store(nexttail_, std::memory_order_release); - return true; - } - - return false; // full queue + template + class circular_fifo { + public: + explicit circular_fifo(const size_t size) : + kSize(size), + kCapacity(kSize + 1), + array_(kCapacity), + tail_(0), + head_(0) { } - // Pop by Consumer can only update the head (load with relaxed, store with release) - // the tail must be accessed with at least aquire - template - bool circular_fifo::pop(Element& item) { - const auto currenthead_ = head_.load(std::memory_order_relaxed); - if (currenthead_ == tail_.load(std::memory_order_acquire)) { - return false; // empty queue - } - - item = std::move(array_[currenthead_]); - head_.store(increment(currenthead_), std::memory_order_release); + virtual ~circular_fifo() {} + + bool push(Element& item); + bool pop(Element& item); + bool empty() const; + bool full() const; + size_t capacity() const; + size_t capacity_free() const; + size_t usage() const; + size_t size() const; + bool lock_free() const; + size_t tail() const { return tail_.load(); } + size_t head() const { return head_.load(); } + + private: + typedef char cache_line[64]; + size_t increment(size_t idx) const { return (idx + 1) % kCapacity; } + const size_t kSize; + const size_t kCapacity; + + cache_line pad_storage_; + std::vector array_; + + cache_line padtail_; + std::atomic tail_; + cache_line padhead_; + std::atomic head_; // head(output) index + cache_line padend_; + }; + + template + bool circular_fifo::push(Element& item) { + const auto currenttail_ = tail_.load(std::memory_order_relaxed); + const auto nexttail_ = increment(currenttail_); + + if (nexttail_ != head_.load(std::memory_order_acquire)) { + array_[currenttail_] = std::move(item); + tail_.store(nexttail_, std::memory_order_release); return true; } - template - bool circular_fifo::empty() const { - // snapshot with acceptance of that this comparison operation is not atomic - return (head_.load(std::memory_order_relaxed) == tail_.load(std::memory_order_relaxed)); - } - - // snapshot with acceptance that this comparison is not atomic - template - bool circular_fifo::full() const { - const auto nexttail_ = increment(tail_.load(std::memory_order_relaxed)); // aquire, we dont know who call - return (nexttail_ == head_.load(std::memory_order_relaxed)); - } - - template - bool circular_fifo::lock_free() const { - return std::atomic{}.is_lock_free(); - } + return false; // full queue + } - template - size_t circular_fifo::size() const { - return ((tail_.load() - head_.load() + kCapacity) % kCapacity); + // Pop by Consumer can only update the head (load with relaxed, store with release) + // the tail must be accessed with at least aquire + template + bool circular_fifo::pop(Element& item) { + const auto currenthead_ = head_.load(std::memory_order_relaxed); + if (currenthead_ == tail_.load(std::memory_order_acquire)) { + return false; // empty queue } - template - size_t circular_fifo::capacity_free() const { - return (kCapacity - size() - 1); - } - - template - size_t circular_fifo::capacity() const { - return kSize; - } - - // percent usage - template - size_t circular_fifo::usage() const { - return (100 * size() / kSize); - } + item = std::move(array_[currenthead_]); + head_.store(increment(currenthead_), std::memory_order_release); + return true; + } + + template + bool circular_fifo::empty() const { + // snapshot with acceptance of that this comparison operation is not atomic + return (head_.load(std::memory_order_relaxed) == tail_.load(std::memory_order_relaxed)); + } + + // snapshot with acceptance that this comparison is not atomic + template + bool circular_fifo::full() const { + const auto nexttail_ = increment(tail_.load(std::memory_order_relaxed)); // aquire, we dont know who call + return (nexttail_ == head_.load(std::memory_order_relaxed)); + } + + template + bool circular_fifo::lock_free() const { + return std::atomic{}.is_lock_free(); + } + + template + size_t circular_fifo::size() const { + return ((tail_.load() - head_.load() + kCapacity) % kCapacity); + } + + template + size_t circular_fifo::capacity_free() const { + return (kCapacity - size() - 1); + } + + template + size_t circular_fifo::capacity() const { + return kSize; + } + + // percent usage + template + size_t circular_fifo::usage() const { + return (100 * size() / kSize); + } } // namespace spsc diff --git a/src/q/test_queue.cpp b/src/q/test_queue.cpp index 14620f4..0864e74 100644 --- a/src/q/test_queue.cpp +++ b/src/q/test_queue.cpp @@ -128,7 +128,6 @@ TEST(Queue, BaseAPI_circular_fifo) { EXPECT_EQ(0, producer.usage()); } - TEST(Queue, BaseAPI_DynamicLocked) { auto queue = queue_api::CreateQueue(10); auto producer = std::get(queue); @@ -249,7 +248,6 @@ TEST(Queue, circular_fifoQueue_AddTillFullRemoveTillEmpty) { AddTillFullRemoveTillEmpty(producer, consumer); } - TEST(Queue, LockedQ_AddTillFullRemoveTillEmpty) { auto queue = queue_api::CreateQueue(100); auto producer = std::get(queue); diff --git a/test/test_mpsc_spmc_round_robin_queues.cpp b/test/test_mpsc_round_robin_queues.cpp similarity index 63% rename from test/test_mpsc_spmc_round_robin_queues.cpp rename to test/test_mpsc_round_robin_queues.cpp index 7023e06..a5b38f7 100644 --- a/test/test_mpsc_spmc_round_robin_queues.cpp +++ b/test/test_mpsc_round_robin_queues.cpp @@ -15,7 +15,7 @@ #include "q/spmc_sender_round_robin.hpp" #include "q/spsc_circular_fifo.hpp" -TEST(MPSC_SPMC, CreateOneQueue_MPSC) { +TEST(MultipleProducers_SingleConsumer, CreateOneQueue) { using element = std::string; using qtype = spsc::circular_fifo; auto queue = queue_api::CreateQueue(10); @@ -23,7 +23,7 @@ TEST(MPSC_SPMC, CreateOneQueue_MPSC) { auto temporary = std::get(queue); // convert the setup to a MPSC setup - mpsc::round_robin::Receiver consumer({temporary}); + mpsc::fixed_size::round_robin::Receiver consumer({temporary}); std::string e; EXPECT_FALSE(consumer.pop(e)); @@ -35,7 +35,7 @@ TEST(MPSC_SPMC, CreateOneQueue_MPSC) { EXPECT_TRUE(consumer.lock_free()); } -TEST(MPSC_SPMC, CreateManyQueues) { +TEST(MultipleProducers_SingleConsumer, CreateManyQueues) { using element = std::string; using qtype = spsc::circular_fifo; constexpr auto senderID = queue_api::index::sender; @@ -51,18 +51,18 @@ TEST(MPSC_SPMC, CreateManyQueues) { receivers.push_back(std::get(queue)); } - mpsc::round_robin::Receiver consumer({receivers}); + mpsc::fixed_size::round_robin::Receiver consumer({receivers}); EXPECT_EQ(kSizeTotal, consumer.capacity()); EXPECT_EQ(kSizeTotal, consumer.capacity_free()); } -TEST(MPSC_SPMC, RoundRobinOfOne) { +TEST(MultipleProducers_SingleConsumer, RoundRobinOfOne) { using element = std::string; using qtype = spsc::circular_fifo; auto queue = queue_api::CreateQueue(2); auto temporary = std::get(queue); // convert the setup to a MPSC setup - mpsc::round_robin::Receiver consumer({temporary}); + mpsc::fixed_size::round_robin::Receiver consumer({temporary}); size_t current = 0; current = consumer.increment(current); @@ -71,7 +71,7 @@ TEST(MPSC_SPMC, RoundRobinOfOne) { EXPECT_EQ(0, current); } -TEST(MPSC_SPMC, RoundRobinOfMany) { +TEST(MultipleProducers_SingleConsumer, RoundRobinOfMany) { using element = std::string; using qtype = spsc::circular_fifo; auto q1 = queue_api::CreateQueue(2); @@ -81,7 +81,7 @@ TEST(MPSC_SPMC, RoundRobinOfMany) { // convert the setup to a MPSC setup // - mpsc::round_robin::Receiver consumer({r1, r2}); + mpsc::fixed_size::round_robin::Receiver consumer({r1, r2}); size_t current = 0; current = consumer.increment(current); @@ -92,7 +92,7 @@ TEST(MPSC_SPMC, RoundRobinOfMany) { EXPECT_EQ(1, current); } -TEST(MPSC_SPMC, full) { +TEST(MultipleProducers_SingleConsumer, full) { using element = std::string; using qtype = spsc::circular_fifo; auto q1 = queue_api::CreateQueue(1); @@ -104,7 +104,7 @@ TEST(MPSC_SPMC, full) { // convert the setup to a MPSC setup // - mpsc::round_robin::Receiver consumer({r1, r2}); + mpsc::fixed_size::round_robin::Receiver consumer({r1, r2}); std::string arg; s1.push(arg); EXPECT_TRUE(r1.full()); @@ -117,7 +117,7 @@ TEST(MPSC_SPMC, full) { EXPECT_TRUE(consumer.full()); } -TEST(MPSC_SPMC, size) { +TEST(MultipleProducers_SingleConsumer, size) { using element = std::string; using qtype = spsc::circular_fifo; auto q1 = queue_api::CreateQueue(1); @@ -129,7 +129,7 @@ TEST(MPSC_SPMC, size) { // convert the setup to a MPSC setup // - mpsc::round_robin::Receiver consumer({r1, r2}); + mpsc::fixed_size::round_robin::Receiver consumer({r1, r2}); std::string arg; EXPECT_EQ(2, consumer.capacity()); EXPECT_EQ(2, consumer.capacity_free()); @@ -142,7 +142,7 @@ TEST(MPSC_SPMC, size) { EXPECT_EQ(0, consumer.capacity_free()); } -TEST(MPSC_SPMC, pop) { +TEST(MultipleProducers_SingleConsumer, pop) { using element = std::string; using qtype = spsc::circular_fifo; auto q1 = queue_api::CreateQueue(1); @@ -154,7 +154,7 @@ TEST(MPSC_SPMC, pop) { // convert the setup to a MPSC setup // - mpsc::round_robin::Receiver consumer({r1, r2}); + mpsc::fixed_size::round_robin::Receiver consumer({r1, r2}); std::string arg = "s0"; s1.push(arg); @@ -177,7 +177,7 @@ TEST(MPSC_SPMC, pop) { EXPECT_EQ("", recv); } -TEST(MPSC_SPMC, usage) { +TEST(MultipleProducers_SingleConsumer, usage) { using element = std::string; using qtype = spsc::circular_fifo; auto q1 = queue_api::CreateQueue(1); @@ -189,7 +189,7 @@ TEST(MPSC_SPMC, usage) { // convert the setup to a MPSC setup // - mpsc::round_robin::Receiver consumer({r1, r2}); + mpsc::fixed_size::round_robin::Receiver consumer({r1, r2}); std::string arg; EXPECT_EQ(0, consumer.usage()); s1.push(arg); @@ -197,81 +197,4 @@ TEST(MPSC_SPMC, usage) { s2.push(arg); EXPECT_EQ(100, consumer.usage()); -} - -TEST(MPSC_SPMC, CreateOneQueue_SPMC) { - using element = std::string; - using qtype = spsc::circular_fifo; - auto queue = queue_api::CreateQueue(10); - auto temporary = std::get(queue); - auto consumer = std::get(queue); - - // convert the setup to a MPSC setup - spmc::round_robin::Sender producer({temporary}); - - std::string e; - EXPECT_TRUE(producer.empty()); - EXPECT_FALSE(producer.full()); - EXPECT_EQ(10, producer.capacity()); - EXPECT_EQ(10, producer.capacity_free()); - EXPECT_EQ(0, producer.size()); - EXPECT_TRUE(producer.lock_free()); - EXPECT_TRUE(producer.push(e)); -} - -TEST(MPSC_SPMC, CreateManyQueues_SPMC) { - using element = std::string; - using qtype = spsc::circular_fifo; - constexpr auto senderID = queue_api::index::sender; - constexpr auto receiverID = queue_api::index::receiver; - - constexpr size_t kSize = 10; - constexpr size_t kSizeTotal = kSize * kSize; - std::vector> senders; - std::vector> receivers; - for (size_t i = 0; i < 10; ++i) { - auto queue = queue_api::CreateQueue(10); - senders.push_back(std::get(queue)); - receivers.push_back(std::get(queue)); - } - - spmc::round_robin::Sender consumer({senders}); - EXPECT_EQ(kSizeTotal, consumer.capacity()); - EXPECT_EQ(kSizeTotal, consumer.capacity_free()); -} - -TEST(MPSC_SPMC, push_SPMC) { - using element = std::string; - using qtype = spsc::circular_fifo; - auto q1 = queue_api::CreateQueue(1); - auto q2 = queue_api::CreateQueue(1); - auto r1 = std::get(q1); - auto s1 = std::get(q1); - auto r2 = std::get(q2); - auto s2 = std::get(q2); - - // convert the setup to a MPSC setup - // - spmc::round_robin::Sender producer({s1, s2}); - std::string arg = "s0"; - producer.push(arg); - - std::string recv; - EXPECT_FALSE(r2.pop(recv)); - EXPECT_TRUE(r1.pop(recv)); - EXPECT_EQ("s0", recv); - - arg = "s1"; - producer.push(arg); - arg = "s2"; - producer.push(arg); - - EXPECT_TRUE(r2.pop(recv)); - EXPECT_EQ("s1", recv); - EXPECT_TRUE(r1.pop(recv)); - EXPECT_EQ("s2", recv); - - EXPECT_TRUE(producer.push(arg)); - EXPECT_TRUE(producer.push(arg)); - EXPECT_FALSE(producer.push(arg)); -} +} \ No newline at end of file diff --git a/test/test_spmc_round_robin_queues.cpp b/test/test_spmc_round_robin_queues.cpp new file mode 100644 index 0000000..395bbb6 --- /dev/null +++ b/test/test_spmc_round_robin_queues.cpp @@ -0,0 +1,92 @@ +/* Not any company's property but Public-Domain +* Do with source-code as you will. No requirement to keep this +* header if need to use it/change it/ or do whatever with it +* +* Note that there is No guarantee that this code will work +* and I take no responsibility for this code and any problems you +* might get if using it. +* Kjell Hedstrom, 2018 +*/ + +#include +#include +#include "q/q_api.hpp" +#include "q/spmc_sender_round_robin.hpp" +#include "q/spsc_circular_fifo.hpp" + +TEST(SingleProducer_MultipleConsumers, CreateOneQueue) { + using element = std::string; + using qtype = spsc::circular_fifo; + auto queue = queue_api::CreateQueue(10); + auto temporary = std::get(queue); + auto consumer = std::get(queue); + + // convert the setup to a MPSC setup + spmc::fixed_size::round_robin::Sender producer({temporary}); + + std::string e; + EXPECT_TRUE(producer.empty()); + EXPECT_FALSE(producer.full()); + EXPECT_EQ(10, producer.capacity()); + EXPECT_EQ(10, producer.capacity_free()); + EXPECT_EQ(0, producer.size()); + EXPECT_TRUE(producer.lock_free()); + EXPECT_TRUE(producer.push(e)); +} + +TEST(SingleProducer_MultipleConsumers, CreateManyQueues) { + using element = std::string; + using qtype = spsc::circular_fifo; + constexpr auto senderID = queue_api::index::sender; + constexpr auto receiverID = queue_api::index::receiver; + + constexpr size_t kSize = 10; + constexpr size_t kSizeTotal = kSize * kSize; + std::vector> senders; + std::vector> receivers; + for (size_t i = 0; i < 10; ++i) { + auto queue = queue_api::CreateQueue(10); + senders.push_back(std::get(queue)); + receivers.push_back(std::get(queue)); + } + + spmc::fixed_size::round_robin::Sender consumer({senders}); + EXPECT_EQ(kSizeTotal, consumer.capacity()); + EXPECT_EQ(kSizeTotal, consumer.capacity_free()); +} + +TEST(SingleProducer_MultipleConsumers, push) { + using element = std::string; + using qtype = spsc::circular_fifo; + auto q1 = queue_api::CreateQueue(1); + auto q2 = queue_api::CreateQueue(1); + auto r1 = std::get(q1); + auto s1 = std::get(q1); + auto r2 = std::get(q2); + auto s2 = std::get(q2); + + // convert the setup to a MPSC setup + // + spmc::fixed_size::round_robin::Sender producer({s1, s2}); + std::string arg = "s0"; + producer.push(arg); + + std::string recv; + EXPECT_FALSE(r2.pop(recv)); + EXPECT_TRUE(r1.pop(recv)); + EXPECT_EQ("s0", recv); + + arg = "s1"; + producer.push(arg); + arg = "s2"; + producer.push(arg); + + EXPECT_TRUE(r2.pop(recv)); + EXPECT_EQ("s1", recv); + EXPECT_TRUE(r1.pop(recv)); + EXPECT_EQ("s2", recv); + + EXPECT_TRUE(producer.push(arg)); + EXPECT_TRUE(producer.push(arg)); + EXPECT_FALSE(producer.push(arg)); +}