Skip to content

Commit

Permalink
Simplify - KISS SPSC (#22)
Browse files Browse the repository at this point in the history
* KISS, for now only one type of lock-free SPSC queue
* rename file
  • Loading branch information
KjellKod authored Jan 4, 2024
1 parent 7a22fcd commit 9004f77
Show file tree
Hide file tree
Showing 16 changed files with 97 additions and 318 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ if ( NOT VERSION )
execute_process(COMMAND bash "-c" "git rev-list --branches HEAD | wc -l | tr -d ' ' | tr -d '\n'" OUTPUT_VARIABLE GIT_VERSION WORKING_DIRECTORY ${CMAKE_SOURCE_DIR})
endif()

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

message( STATUS "Compiler: ${CMAKE_C_COMPILER}")
message(STATUS "C++ version: ${CMAKE_CXX_STANDARD}")

set(MINOR_VERSION 1)
math(EXPR VERSION-BASE ${GIT_VERSION}/255)
Expand Down Expand Up @@ -44,7 +48,7 @@ endif()
# change to c++20 in version 2.0
if (MSVC)
else ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wunused -std=c++17 -pthread -D_GLIBCXX_USE_NANOSLEEP")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wunused -pthread -D_GLIBCXX_USE_NANOSLEEP")
endif()

if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ All queues are in-process, thread-to-thread queues.
This lock-free queue is safe to use between one producer thread and one consumer thread.
**SPSC lock-free options:**
1. `fixed_circular_fifo`: Set the size of the queue in your code, the size is set during compiled time.
1. `flexible_circular_fifo`: Set the size of the queue in the constructor.
1. `circular_fifo`: Set the size of the queue in the constructor.

_The SPSC is a powerful building block from which you can create more lock-free complicated queue structures if number of producers and consumers are known at creation time._

Expand All @@ -20,11 +20,11 @@ Please see [benchmark documentation](benchmark.md) for details.
# NOT YET DOCUMENTED API

2. **MPMC:** *multiple producer, multiple consumer*
- `lock flexible-lock-queue`: runtime, at construction, set max size of queue or set to unlimited in size
- `dynamically sized, mutex-lock-queue`: runtime, at construction, set max size of queue or set to unlimited in size
3. **MPSC:** *multiple producer, singe consumer*
- `flexible or fixed lock-free circular fifo`: Using fair scheduling the many SPSC queues are consumed in an optimized round-robin manner
- `lock-free circular fifo`: Using fair scheduling the many SPSC queues are consumed in an optimized round-robin manner
4. **SPMC:** *single producer, multiple consumer*
- `flexible or fixed lock-free circular fifo`: Using fair scheduling the producer transfers over many SPSC queues
- `lock-free circular fifo`: Using fair scheduling the producer transfers over many SPSC queues



Expand Down
4 changes: 2 additions & 2 deletions benchmark/benchmark_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "benchmark_runs.hpp"
#include "q/mpmc_lock_queue.hpp"
#include "q/q_api.hpp"
#include "q/spsc_flexible_circular_fifo.hpp"
#include "q/spsc_circular_fifo.hpp"

namespace {
const size_t kGoodSizedQueueSize = (2 << 16); // 65536
Expand Down Expand Up @@ -76,7 +76,7 @@ int main() {
// Print the headers
std::cout << "#runs,\t#p,\t#c,\t#msgs/s,\t#min_msgs/s,\t#max_msgs/s,\tavg call [ns],\tcomment" << std::endl;

auto spsc_result = benchmark_queue<spsc::flexible::circular_fifo<unsigned int>>("SPSC benchmark");
auto spsc_result = benchmark_queue<spsc::circular_fifo<unsigned int>>("SPSC benchmark");
print_result(spsc_result);

auto spsc_lockqueue_result = benchmark_queue<mpmc::lock_queue<unsigned int>>("SPSC using the lock-based MPMC benchmark");
Expand Down
24 changes: 12 additions & 12 deletions benchmark/test_performance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

// using namespace test_performance;

// TEST(Performance, SPSC_Flexible_CircularFifo) {
// auto queue = queue_api::CreateQueue<spsc::flexible::circular_fifo<std::string>>(kGoodSizedQueueSize);
// TEST(Performance, SPSC_circular_fifo_CircularFifo) {
// auto queue = queue_api::CreateQueue<spsc::circular_fifo<std::string>>(kGoodSizedQueueSize);
// auto result = RunSPS2C(queue, kAmount);
// EXPECT_EQ(result.size(), 3);
// auto sent = result[0];
Expand All @@ -38,13 +38,13 @@
// std::cout << "messages / second " << kAmount/(runner.nanotime * 1000000000)
// }

// // TEST(Performance, SPSC_Flexible_CircularFifo) {
// // auto queue = queue_api::CreateQueue<spsc::flexible::circular_fifo<std::string>>(kSmallQueueSize);
// // TEST(Performance, SPSC_circular_fifo_CircularFifo) {
// // auto queue = queue_api::CreateQueue<spsc::circular_fifo<std::string>>(kSmallQueueSize);
// // RunSPSC(queue, kAmount);
// // }

// // TEST(Performance, SPSC_Flexible_CircularFifo_Smaller) {
// // auto queue = queue_api::CreateQueue<spsc::flexible::circular_fifo<std::string>>(kSmallQueueSize);
// // TEST(Performance, SPSC_circular_fifo_CircularFifo_Smaller) {
// // auto queue = queue_api::CreateQueue<spsc::circular_fifo<std::string>>(kSmallQueueSize);
// // RunSPSC(queue, kAmount);
// // }

Expand Down Expand Up @@ -73,9 +73,9 @@
// // RunSPSC(queue, kAmount);
// // }

// // TEST(Performance, DISABLED_SPSC_Flexible_20secRun_LargeData) {
// // TEST(Performance, DISABLED_SPSC_circular_fifo_20secRun_LargeData) {
// // using namespace std;
// // auto queue = queue_api::CreateQueue<spsc::flexible::circular_fifo<std::string>>(kSmallQueueSize);
// // auto queue = queue_api::CreateQueue<spsc::circular_fifo<std::string>>(kSmallQueueSize);
// // const size_t large = 65000;
// // std::string payload(large, 'x');
// // EXPECT_EQ(large, payload.size());
Expand Down Expand Up @@ -124,7 +124,7 @@
// // TEST(Performance, DISABLED_lock_free__SPMC_1_to_4_20secRun_LargeData) {
// // using namespace std;
// // using element = std::string;
// // using qtype = spsc::flexible::circular_fifo<element>;
// // using qtype = spsc::circular_fifo<element>;
// // using qtype_pair = std::tuple<queue_api::Sender<qtype>, queue_api::Receiver<qtype>>;
// // std::vector<qtype_pair> queues;
// // for (size_t i = 0; i < 4; ++i) {
Expand All @@ -141,7 +141,7 @@
// // TEST(Performance, DISABLED_lock_free__SPMC_1_to_4_20secRun_SmallData) {
// // using namespace std;
// // using element = std::string;
// // using qtype = spsc::flexible::circular_fifo<element>;
// // using qtype = spsc::circular_fifo<element>;
// // using qtype_pair = std::tuple<queue_api::Sender<qtype>, queue_api::Receiver<qtype>>;
// // std::vector<qtype_pair> queues;
// // for (size_t i = 0; i < 4; ++i) {
Expand Down Expand Up @@ -182,7 +182,7 @@
// // TEST(Performance, DISABLED_lock_free__MPSC_4_to_1_20secRun_LargeData) {
// // using namespace std;
// // using element = std::string;
// // using qtype = spsc::flexible::circular_fifo<element>;
// // using qtype = spsc::circular_fifo<element>;
// // using qtype_pair = std::tuple<queue_api::Sender<qtype>, queue_api::Receiver<qtype>>;
// // std::vector<qtype_pair> queues;
// // for (size_t i = 0; i < 4; ++i) {
Expand All @@ -199,7 +199,7 @@
// // TEST(Performance, DISABLED_lock_free__MPSC_4_to_1_20secRun_SmallData) {
// // using namespace std;
// // using element = std::string;
// // using qtype = spsc::flexible::circular_fifo<element>;
// // using qtype = spsc::circular_fifo<element>;
// // using qtype_pair = std::tuple<queue_api::Sender<qtype>, queue_api::Receiver<qtype>>;
// // std::vector<qtype_pair> queues;
// // for (size_t i = 0; i < 4; ++i) {
Expand Down
8 changes: 4 additions & 4 deletions examples/spsc_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
#include "q/spsc.hpp"

// Alias for easier readability
using spsc_queue_type = spsc::flexible::circular_fifo<std::string>;
using spsc_queue_type = spsc::circular_fifo<std::string>;
using sender_type = std::shared_ptr<queue_api::Sender<spsc_queue_type>>;
using receiver_type = std::shared_ptr<queue_api::Receiver<spsc_queue_type>>;

void produceMessages(queue_api::Sender<spsc::flexible::circular_fifo<std::string>>& sender, std::atomic<bool>& should_continue_working) {
void produceMessages(queue_api::Sender<spsc::circular_fifo<std::string>>& sender, std::atomic<bool>& should_continue_working) {
std::vector<std::string> greetings = {"Hello", "Bonjour", "Tjena", "Ciao", "Hola", "Hallo", "Hei", "Aloha", "Shalom", "Namaste",
"Hello", "Bonjour", "Tjena", "Ciao", "Hola", "Hallo", "Hei", "Aloha", "Shalom", "Namaste"};

Expand Down Expand Up @@ -43,8 +43,8 @@ void consumeMessages(ReceiverQ& receiver, std::atomic<bool>& keep_working) {
}

int main() {
// Create a flexible SPSC queue with a dynamic size, determined at runtime initialization.
auto queue = queue_api::CreateQueue<spsc::flexible::circular_fifo<std::string>>(10);
// Create a circular_fifo SPSC queue with a dynamic size, determined at runtime initialization.
auto queue = queue_api::CreateQueue<spsc::circular_fifo<std::string>>(10);

// Get the sender and receiver endpoints of the queue
auto senderQ = std::get<queue_api::index::sender>(queue);
Expand Down
5 changes: 2 additions & 3 deletions spsc.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

This lock-free queue is safe to use between one producer thread and one consumer thread.
**SPSC lock-free options:**
1. `fixed_circular_fifo`: Set the size of the queue in your code, the size is set during compiled time.
1. `flexible_circular_fifo`: Set the size of the queue in the constructor.
1. `circular_fifo`: Set the size of the queue in the constructor.

_The SPSC is a powerful building block from which you can create more lock-free complicated queue structures if number of producers and consumers are known at creation time._

**SPSC Naive Example**
The raw and "unsafe" way to create the queue is to just "create it". This makes it harder to use right as the producer and consumer threads must only touch "their" parts of the API or the queue would not be thread-safe.
```
using spsc_queue_type = spsc::flexible::circular_fifo<string>;
using spsc_queue_type = spsc::circular_fifo<string>;
auto q_size = 1000;
auto spsc_queue = spsc_queue_type(q_size);
// through spsc_queue the FULL producer/consumer API is available,
Expand Down
2 changes: 1 addition & 1 deletion src/q/mpsc_receiver_round_robin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <vector>
#include "q/q_api.hpp"
#include "q/round_robin_api.hpp"
#include "q/spsc_flexible_circular_fifo.hpp"
#include "q/spsc_circular_fifo.hpp"

// MPSC : Many Single Producers - Single Consumer
namespace mpsc {
Expand Down
2 changes: 1 addition & 1 deletion src/q/spmc_sender_round_robin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include <vector>
#include "q/q_api.hpp"
#include "q/round_robin_api.hpp"
#include "q/spsc_flexible_circular_fifo.hpp"
#include "q/spsc_circular_fifo.hpp"

// MPSC : Many Single Producers - Single Consumer
namespace spmc {
Expand Down
3 changes: 1 addition & 2 deletions src/q/spsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@

#pragma once

#include "q/spsc_fixed_circular_fifo.hpp"
#include "q/spsc_flexible_circular_fifo.hpp"
#include "q/spsc_circular_fifo.hpp"
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <vector>

namespace spsc {
namespace flexible {
template <typename Element>
class circular_fifo {
public:
Expand Down Expand Up @@ -138,5 +137,4 @@ namespace spsc {
size_t circular_fifo<Element>::usage() const {
return (100 * size() / kSize);
}
} // namespace flexible
} // namespace spsc
126 changes: 0 additions & 126 deletions src/q/spsc_fixed_circular_fifo.hpp

This file was deleted.

Loading

0 comments on commit 9004f77

Please sign in to comment.