diff --git a/benchmark/benchmark_main.cpp b/benchmark/benchmark_main.cpp index f02ea25..b74098e 100644 --- a/benchmark/benchmark_main.cpp +++ b/benchmark/benchmark_main.cpp @@ -2,6 +2,7 @@ #include #include "benchmark_functions.hpp" #include "benchmark_runs.hpp" +#include "q/mpmc_lock_queue.hpp" #include "q/q_api.hpp" #include "q/spsc_flexible_circular_fifo.hpp" @@ -22,16 +23,20 @@ namespace { } // namespace void print_result(const benchmark_result& result) { - std::cout << std::setw(2) << result.runs << ", " + double average_call_time_ns = result.mean_msgs_per_second ? 1e9 / result.mean_msgs_per_second : 0; + + std::cout << std::left << std::setw(2) << result.runs << ", " << std::setw(5) << result.num_producer_threads << ", " << std::setw(6) << result.num_consumer_threads << ", " << std::setw(16) << result.mean_msgs_per_second << ", " << std::setw(14) << result.min_msgs_per_second << ", " << std::setw(14) << result.max_msgs_per_second << ", " - << std::setw(17) << result.comment << std::endl; + << std::setw(10) << average_call_time_ns << "," + << std::setw(30) << result.comment << std::endl; } -benchmark_result benchmark_spsc() { +template +benchmark_result benchmark_queue(const std::string& comment) { const int kRuns = 33; std::vector results; double total_duration_ns = 0.0; @@ -40,7 +45,7 @@ benchmark_result benchmark_spsc() { double total_msgs_per_second = 0.0; for (int i = 0; i < kRuns; ++i) { - auto queue = queue_api::CreateQueue>(kGoodSizedQueueSize); + auto queue = queue_api::CreateQueue(kGoodSizedQueueSize); auto result = benchmark::runSPSC(queue, kNumberOfItems); results.push_back(result); total_duration_ns += result.elapsed_time_in_ns; @@ -62,16 +67,56 @@ benchmark_result benchmark_spsc() { result.mean_msgs_per_second = mean_msgs_per_second; result.min_msgs_per_second = min_msgs_per_second; result.max_msgs_per_second = max_msgs_per_second; - result.comment = "SPSC benchmark"; + result.comment = comment; return result; } int main() { // Print the headers - std::cout << "#runs,\t#p,\t#c,\t#msgs/s,\t#min_msgs/s,\t#max_msgs/s,\tcomment" << std::endl; - auto spsc_result = benchmark_spsc(); + std::cout << "#runs,\t#p,\t#c,\t#msgs/s,\t#min_msgs/s,\t#max_msgs/s,\tavg call [ns],\tcomment" << std::endl; + + auto spsc_result = benchmark_queue>("SPSC benchmark"); print_result(spsc_result); + auto spsc_lockqueue_result = benchmark_queue>("SPSC using the lock-based MPMC benchmark"); + print_result(spsc_lockqueue_result); + return 0; -} \ No newline at end of file +} + +// benchmark_result benchmark_mpmc() { +// const int kRuns = 33; +// std::vector results; +// double total_duration_ns = 0.0; +// double min_msgs_per_second = std::numeric_limits::max(); +// double max_msgs_per_second = std::numeric_limits::min(); +// double total_msgs_per_second = 0.0; + +// for (int i = 0; i < kRuns; ++i) { +// auto queue = queue_api::CreateQueue>(kGoodSizedQueueSize); +// auto result = benchmark::runSPSC(queue, kNumberOfItems); +// results.push_back(result); +// total_duration_ns += result.elapsed_time_in_ns; + +// double duration_s = result.elapsed_time_in_ns / 1e9; // convert ns to seconds +// double msgs_per_second = kNumberOfItems / duration_s; // messages per second for this run +// total_msgs_per_second += msgs_per_second; +// min_msgs_per_second = std::min(min_msgs_per_second, msgs_per_second); +// max_msgs_per_second = std::max(max_msgs_per_second, msgs_per_second); +// } + +// double mean_msgs_per_second = total_msgs_per_second / kRuns; // mean messages per second + +// benchmark_result result; +// result.runs = kRuns; +// result.num_producer_threads = 1; +// result.num_consumer_threads = 1; +// result.messages_per_iteration = kNumberOfItems; +// result.mean_msgs_per_second = mean_msgs_per_second; +// result.min_msgs_per_second = min_msgs_per_second; +// result.max_msgs_per_second = max_msgs_per_second; +// result.comment = "SPSC using the lock-based MPMC benchmark"; + +// return result; +// } \ No newline at end of file diff --git a/src/q/benchmark_functions.hpp b/src/q/benchmark_functions.hpp new file mode 100644 index 0000000..248d179 --- /dev/null +++ b/src/q/benchmark_functions.hpp @@ -0,0 +1,136 @@ +/* +* Not any company's property but Public-Domain +* Do with source-code as you will. No requirement to keep this +* header if need to use it/change it/ or do whatever with it +* +* Note that there is No guarantee that this code will work +* and I take no responsibility for this code and any problems you +* might get if using it. +* +* Originally published at https://github.com/KjellKod/Q +*/ + +#pragma once +#include +#include +#include +#include +#include +#include +#include "q/q_api.hpp" +#include "stopwatch.hpp" + +#define Q_CHECK_EQ(value1, value2) \ + do { \ + if ((value1) != (value2)) { \ + std::cout << "Line " << __LINE__ << ": CHECK failed, " \ + << (value1) << " is not equal with " << (value2) << std::endl; \ + assert((value1) == (value2)); \ + } \ + } while (0) + +#define Q_CHECK(expr) \ + do { \ + if (!(expr)) { \ + std::cout << "Line " << __LINE__ << ": CHECK failed, " \ + << #expr << " is not true" << std::endl; \ + assert((expr)); \ + } \ + } while (0) + +namespace benchmark { + struct result_t { + uint64_t total_sum; + uint64_t elapsed_time_in_ns; + }; + + const std::chrono::milliseconds kMaxWaitMs(1000); + + template + result_t Push(Sender q, const size_t stop, std::atomic& producerStart, std::atomic& consumerStart) { + using namespace std::chrono_literals; + producerStart.store(true); + while (!consumerStart.load()) { + std::this_thread::sleep_for(10ns); + } + benchmark::stopwatch watch; + uint64_t sum = 0; + for (unsigned int i = 1; i <= stop; ++i) { + Q_CHECK(q.wait_and_push(i, kMaxWaitMs)); + sum += i; + } + return {sum, watch.elapsed_ns()}; + } + + template + result_t Get(Receiver q, size_t stop, std::atomic& producerStart, std::atomic& consumerStart) { + using namespace std::chrono_literals; + consumerStart.store(true); + while (!producerStart.load()) { + std::this_thread::sleep_for(10ns); + } + benchmark::stopwatch watch; + uint64_t sum = 0; + for (;;) { + unsigned int value = 0; + Q_CHECK(q.wait_and_pop(value, kMaxWaitMs)); + sum += value; + if (value == stop) { + break; + } + } + return {sum, watch.elapsed_ns()}; + } +} // namespace benchmark + +// template +// size_t PushUntil(Sender q, std::string data, +// std::atomic& producerCount, +// std::atomic& stopRunning) { +// using namespace std::chrono_literals; +// producerCount++; +// using namespace std::chrono_literals; + +// benchmark::stopwatch watch; +// size_t amountPushed = 0; +// while (!stopRunning.load(std::memory_order_relaxed)) { +// std::string value = data; +// while (false == q.push(value) && !stopRunning.load(std::memory_order_relaxed)) { +// std::this_thread::sleep_for(100ns); // yield is too aggressive +// } +// ++amountPushed; +// } +// return amountPushed; +// } + +// // template +// // size_t GetUntil(Receiver q, const std::string data, +// // std::atomic& consumerCount, +// // std::atomic& stopRunning) { +// // using namespace std::chrono_literals; +// // consumerCount++; + +// // benchmark::stopwatch watch; +// // size_t amountReceived = 0; +// // size_t byteReceived = 0; +// // while (!stopRunning.load(std::memory_order_relaxed)) { +// // std::string value; +// // bool result = false; +// // std::chrono::milliseconds wait{10}; +// // while (!(result = q.wait_and_pop(value, wait))) { +// // if (stopRunning.load(std::memory_order_relaxed)) { +// // break; +// // } +// // } +// // if (result) { +// // EXPECT_EQ(data.size(), value.size()); +// // EXPECT_FALSE(value.was_was_empty()); +// // ++amountReceived; +// // byteReceived += value.size(); +// // } +// // } +// // std::ostringstream oss; +// // oss << "Bytes received: " << byteReceived << std::endl; +// // std::cout << oss.str(); +// // return amountReceived; +// // } \ No newline at end of file diff --git a/src/q/mpmc_lock_queue.hpp b/src/q/mpmc_lock_queue.hpp new file mode 100644 index 0000000..ecc113e --- /dev/null +++ b/src/q/mpmc_lock_queue.hpp @@ -0,0 +1,175 @@ +/** ========================================================================== +* Not any company's property but Public-Domain +* Do with source-code as you will. No requirement to keep this +* header if need to use it/change it/ or do whatever with it +* +* Note that there is No guarantee that this code will work +* and I take no responsibility for this code and any problems you +* might get if using it. +* +* ========================================================================== +* 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes +* with no warranties. This code is yours to share, use and modify with no +* strings attached and no restrictions or obligations. +* +* For more information see g3log/LICENSE or refer refer to http://unlicense.org +* ============================================================================ +* +* Example of a normal std::queue protected by a mutex for operations, +* making it safe for thread communication, using std::mutex from C++0x with +* the help from the std::thread library from JustSoftwareSolutions +* ref: http://www.stdthread.co.uk/doc/headers/mutex.html +* +* This exampel was totally inspired by Anthony Williams lock-based data structures in +* Ref: "C++ Concurrency In Action" http://www.manning.com/williams */ + +#pragma once + +#include +#include +#include +#include +#include + +/** Multiple producer, multiple consumer (mpmc) thread safe queue +* protected by mutex. Since 'return by reference' is used this queue won't throw */ +namespace mpmc { + template + class lock_queue { + static const int kUnlimited = -1; + static const int kSmallDefault = 100; + const int kMaxSize; + std::queue queue_; + mutable std::mutex m_; + std::condition_variable data_cond_; + + lock_queue& operator=(const lock_queue&) = delete; + lock_queue(const lock_queue& other) = delete; + + bool internal_full() const; + size_t internal_capacity() const; + + public: + // -1 : unbounded + // 0 ... N : bounded (0 is silly) + lock_queue(const int maxSize = kSmallDefault); + + bool lock_free() const; + bool push(T& item); + bool pop(T& popped_item); + bool wait_and_pop(T& popped_item, std::chrono::milliseconds max_wait); + bool full(); + bool empty() const; + size_t size() const; + size_t capacity() const; + size_t capacity_free() const; + size_t usage() const; + }; + + // maxSize of -1 equals unlimited size + template + lock_queue::lock_queue(int maxSize) : + kMaxSize(maxSize) {} + + template + bool lock_queue::lock_free() const { + return false; + } + + template + bool lock_queue::push(T& item) { + { + std::lock_guard lock(m_); + if (internal_full()) { + return false; + } + queue_.push(std::move(item)); + } // lock_guard off + data_cond_.notify_one(); + return true; + } + + template + bool lock_queue::pop(T& popped_item) { + std::lock_guard lock(m_); + if (queue_.empty()) { + return false; + } + popped_item = std::move(queue_.front()); + queue_.pop(); + return true; + } + + template + bool lock_queue::wait_and_pop(T& popped_item, std::chrono::milliseconds max_wait) { + std::unique_lock lock(m_); + auto const timeout = std::chrono::steady_clock::now() + max_wait; + while (queue_.empty()) { + if (data_cond_.wait_until(lock, timeout) == std::cv_status::timeout) { + break; + } + // This 'while' loop is equal to + // data_cond_.wait(lock, [](bool result){return !queue_.empty();}); + } + if (queue_.empty()) { + return false; + } + + popped_item = std::move(queue_.front()); + queue_.pop(); + return true; + } + + template + bool lock_queue::full() { + std::lock_guard lock(m_); + return internal_full(); + } + + template + bool lock_queue::empty() const { + std::lock_guard lock(m_); + return queue_.empty(); + } + + template + size_t lock_queue::size() const { + std::lock_guard lock(m_); + return queue_.size(); + } + + template + size_t lock_queue::capacity() const { + std::lock_guard lock(m_); + return internal_capacity(); + } + + template + size_t lock_queue::capacity_free() const { + std::lock_guard lock(m_); + return internal_capacity() - queue_.size(); + } + + template + size_t lock_queue::usage() const { + std::lock_guard lock(m_); + return (100 * queue_.size() / internal_capacity()); + } + + // private + template + size_t lock_queue::internal_capacity() const { + if (kMaxSize == kUnlimited) { + return std::numeric_limits::max(); + } + return kMaxSize; + } + + template + bool lock_queue::internal_full() const { + if (kMaxSize == kUnlimited) { + return false; + } + return (queue_.size() >= static_cast(kMaxSize)); + } +} // namespace mpmc diff --git a/src/q/test_queue.cpp b/src/q/test_queue.cpp new file mode 100644 index 0000000..4194e92 --- /dev/null +++ b/src/q/test_queue.cpp @@ -0,0 +1,416 @@ +/* Not any company's property but Public-Domain +* Do with source-code as you will. No requirement to keep this +* header if need to use it/change it/ or do whatever with it +* +* Note that there is No guarantee that this code will work +* and I take no responsibility for this code and any problems you +* might get if using it. +* +* Originally published at: https://github.com/KjellKod/Q +*/ +#include +#include +#include +#include +#include +#include "stopwatch.hpp" + +using namespace std; +using Type = string; +using FlexibleQ = spsc::flexible::circular_fifo; +using FixedQ = spsc::fixed::circular_fifo; +using FixedSmallQ = spsc::fixed::circular_fifo; +using LockedQ = mpmc::lock_queue; + +template +void ProdConsInitialization(Prod& prod, Cons& cons) { + EXPECT_TRUE(prod.empty()); + EXPECT_TRUE(cons.empty()); + + EXPECT_FALSE(prod.full()); + EXPECT_FALSE(cons.full()); + + EXPECT_TRUE(prod.lock_free()); + EXPECT_TRUE(cons.lock_free()); + + EXPECT_EQ(10, prod.capacity()); + EXPECT_EQ(10, cons.capacity()); + + EXPECT_EQ(10, prod.capacity_free()); + EXPECT_EQ(10, cons.capacity_free()); + + EXPECT_EQ(0, prod.size()); + EXPECT_EQ(0, cons.size()); +} + +TEST(Queue, ProdConsInitialization) { + auto queue = queue_api::CreateQueue(10); + auto producer = std::get(queue); + auto consumer = std::get(queue); +} + +TEST(Queue, ProdConsInitializationCopy) { + using namespace queue_api; + using QueuePair = std::pair, Receiver>; + QueuePair queue = CreateQueue(10); + Sender sender1 = std::get(queue); + Sender sender2(std::get(queue)); + Receiver receiver1 = std::get(queue); + Receiver receiver2(std::get(queue)); +} + +struct HasWaitAndPop { + std::chrono::milliseconds value; + std::string element; + HasWaitAndPop() : + value(0) {} + bool wait_and_pop(std::string& x, std::chrono::milliseconds wait_ms) { + value = wait_ms; + element = x; + return false; + } + + bool pop(std::string& x) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(10s); + return true; + } +}; + +struct HasPop { + size_t value; + std::string element; + HasPop() : + value(0) {} + bool pop(std::string& x) { + element = x; + return false; // to ensure that the wait is triggered + } +}; + +struct HasWaitAndPush { + std::chrono::milliseconds value; + std::string element; + HasWaitAndPush() : + value(0) {} + bool wait_and_push(std::string& x, std::chrono::milliseconds wait_ms) { + value = wait_ms; + element = x; + return false; + } + + bool push(std::string& x) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(10s); + return true; + } +}; + +struct HasPush { + size_t value; + std::string element; + HasPush() : + value(0) {} + bool push(std::string& x) { + element = x; + return false; // to ensure that the wait is triggered + } +}; + +TEST(Queue, BaseAPI_Flexible) { + auto queue = queue_api::CreateQueue(10); + auto producer = std::get(queue); + auto consumer = std::get(queue); + EXPECT_TRUE(producer.empty()); + EXPECT_FALSE(producer.full()); + EXPECT_EQ(10, producer.capacity()); + EXPECT_EQ(10, producer.capacity_free()); + EXPECT_EQ(0, producer.size()); + EXPECT_TRUE(producer.lock_free()); + EXPECT_EQ(0, producer.usage()); +} + +TEST(Queue, BaseAPI_Fixed) { + auto queue = queue_api::CreateQueue(); + auto producer = std::get(queue); + auto consumer = std::get(queue); + EXPECT_TRUE(producer.empty()); + EXPECT_FALSE(producer.full()); + EXPECT_EQ(100, producer.capacity()); + EXPECT_EQ(100, producer.capacity_free()); + EXPECT_EQ(0, producer.size()); + EXPECT_TRUE(producer.lock_free()); + EXPECT_EQ(0, producer.usage()); +} + +TEST(Queue, BaseAPI_DynamicLocked) { + auto queue = queue_api::CreateQueue(10); + auto producer = std::get(queue); + auto consumer = std::get(queue); + EXPECT_TRUE(producer.empty()); + EXPECT_FALSE(producer.full()); + EXPECT_EQ(10, producer.capacity()); + EXPECT_EQ(10, producer.capacity_free()); + EXPECT_EQ(0, producer.size()); + EXPECT_FALSE(producer.lock_free()); // NOT lock -free + EXPECT_EQ(0, producer.usage()); +} + +TEST(Queue, SFINAE_HasWaitAndPop) { + auto queue = queue_api::CreateQueue(); + auto consumer = std::get(queue); + + benchmark::stopwatch watch; + std::string msg; + std::chrono::milliseconds wait(2 * 1000); + auto result = consumer.wait_and_pop(msg, wait); // wait_and_pop but the bogus mock function ignores the 'wait'. We only verify SFINAE + EXPECT_FALSE(result); + EXPECT_TRUE(watch.elapsed_sec() <= 2); +} + +TEST(Queue, SFINAE_HasPop) { + auto queue = queue_api::CreateQueue(); + auto consumer = std::get(queue); + + benchmark::stopwatch watch; + std::string msg; + std::chrono::milliseconds wait(2 * 1000); + auto result = consumer.wait_and_pop(msg, wait); // wrapper implements wait. This is the actual function. we have the desired max wait. + EXPECT_FALSE(result); + EXPECT_TRUE(watch.elapsed_sec() >= 2); +} + +TEST(Queue, SFINAE_HasWaitAndPushp) { + auto queue = queue_api::CreateQueue(); + auto producer = std::get(queue); + + benchmark::stopwatch watch; + std::string msg; + std::chrono::milliseconds wait(2 * 1000); + auto result = producer.wait_and_push(msg, wait); // wait_and_pop but the bogus mock function ignores the 'wait'. We only verify SFINAE + EXPECT_FALSE(result); + EXPECT_TRUE(watch.elapsed_sec() <= 2); +} + +TEST(Queue, SFINAE_HasPush) { + auto queue = queue_api::CreateQueue(); + auto consumer = std::get(queue); + + benchmark::stopwatch watch; + std::string msg; + std::chrono::milliseconds wait(2 * 1000); + auto result = consumer.wait_and_push(msg, wait); // wrapper implements wait. This is the actual function. we have the desired max wait. + EXPECT_FALSE(result); + EXPECT_TRUE(watch.elapsed_sec() >= 2); +} + +template +void QAddOne(Prod& prod) { + std::string arg = "test"; + EXPECT_TRUE(prod.push(arg)); + EXPECT_FALSE(prod.full()); + EXPECT_EQ(100, prod.capacity()); + EXPECT_EQ(99, prod.capacity_free()); + EXPECT_EQ(1, prod.size()); + EXPECT_EQ(1, prod.tail()); +} + +TEST(Queue, CircularQueue_AddOne) { + FlexibleQ dQ{100}; + QAddOne(dQ); + + FixedQ fQ{}; + QAddOne(fQ); +} + +template +void AddTillFullRemoveTillEmpty(Prod& prod, Cons& cons) { + size_t size = 0; + size_t free = prod._qref.capacity(); + size_t loopSize = 2; + + EXPECT_EQ(0, prod.usage()); + for (size_t i = 0; i < loopSize; ++i) { + while (!prod.full()) { + std::string value = to_string(i); + EXPECT_TRUE(prod.push(value)); + ++size; + --free; + EXPECT_EQ((100 * prod._qref.size() / prod._qref.capacity()), prod._qref.usage()); + EXPECT_EQ(size, prod.size()); + EXPECT_EQ(free, prod.capacity_free()); + } + EXPECT_TRUE(prod.full()); + EXPECT_TRUE(cons.full()); + EXPECT_EQ(100, prod.usage()); + EXPECT_EQ(prod.size(), prod.capacity()) << "i: " << i; + EXPECT_EQ(cons.size(), cons.capacity()); + + string t; + while (!cons.empty()) { + cons.pop(t); + --size; + ++free; + EXPECT_EQ((100 * prod._qref.size() / prod._qref.capacity()), prod._qref.usage()); + EXPECT_EQ(size, cons.size()); + EXPECT_EQ(free, cons.capacity_free()); + } + } +} + +TEST(Queue, FlexibleQueue_AddTillFullRemoveTillEmpty) { + auto queue = queue_api::CreateQueue(100); + auto producer = std::get(queue); + auto consumer = std::get(queue); + AddTillFullRemoveTillEmpty(producer, consumer); +} + +TEST(Queue, FixedQueue_AddTillFullRemoveTillEmpty) { + auto queue = queue_api::CreateQueue(); + auto producer = std::get(queue); + auto consumer = std::get(queue); + AddTillFullRemoveTillEmpty(producer, consumer); +} + +TEST(Queue, LockedQ_AddTillFullRemoveTillEmpty) { + auto queue = queue_api::CreateQueue(100); + auto producer = std::get(queue); + auto consumer = std::get(queue); + AddTillFullRemoveTillEmpty(producer, consumer); +} + +template +void MoveArgument(Prod& prod, Cons& cons) { + std::string arg = "hello"; + EXPECT_TRUE(prod.push(arg)); + EXPECT_TRUE(arg.empty()); + + arg = "world"; + EXPECT_TRUE(prod.push(arg)); + EXPECT_TRUE(arg.empty()); + + arg = "!"; + EXPECT_FALSE(prod.push(arg)); + EXPECT_FALSE(arg.empty()); + EXPECT_EQ("!", arg); + + EXPECT_TRUE(cons.pop(arg)); + EXPECT_EQ("hello", arg); +} + +TEST(Queue, FlexibleQ_MoveArgument) { + auto queue = queue_api::CreateQueue(2); + auto producer = std::get(queue); + auto consumer = std::get(queue); + MoveArgument(producer, consumer); +} + +TEST(Queue, FixedSmallQ_MoveArgument) { + auto queue = queue_api::CreateQueue(); + auto producer = std::get(queue); + auto consumer = std::get(queue); + MoveArgument(producer, consumer); +} + +TEST(Queue, LockedQ_MoveArgument) { + auto queue = queue_api::CreateQueue(2); + auto producer = std::get(queue); + auto consumer = std::get(queue); + MoveArgument(producer, consumer); +} + +template +void MoveUniquePtrArgument(Prod& prod, Cons& cons) { + auto arg = make_unique("hello"); + EXPECT_TRUE(prod.push(arg)); + ASSERT_TRUE(nullptr == arg); + + arg = make_unique("world"); + EXPECT_TRUE(prod.push(arg)); + ASSERT_TRUE(nullptr == arg); + + arg = make_unique("!"); + EXPECT_FALSE(prod.push(arg)); + EXPECT_FALSE(arg->empty()); + ASSERT_FALSE(nullptr == arg); + EXPECT_EQ("!", *arg.get()); + + EXPECT_TRUE(cons.pop(arg)); + EXPECT_EQ("hello", *arg.get()); +} + +namespace { + using Unique = unique_ptr; +} + +TEST(Queue, FlexibleQ_MoveUnique) { + auto queue = queue_api::CreateQueue>(2); + auto producer = std::get(queue); + auto consumer = std::get(queue); + MoveUniquePtrArgument(producer, consumer); +} + +TEST(Queue, FixedQ_MoveUnique) { + auto queue = queue_api::CreateQueue>(); + auto producer = std::get(queue); + auto consumer = std::get(queue); + MoveUniquePtrArgument(producer, consumer); +} + +TEST(Queue, LockedQ_MoveUnique) { + auto queue = queue_api::CreateQueue>(2); + auto producer = std::get(queue); + auto consumer = std::get(queue); + MoveUniquePtrArgument(producer, consumer); +} + +// Raw pointers are not transformed to nullptr +// since move semantics won't affect them +template +void NoMovePtrArgument(Prod& prod, Cons& cons) { + auto arg1 = make_unique("hello"); + auto arg1ptr = arg1.get(); + EXPECT_TRUE(prod.push(arg1ptr)); + ASSERT_FALSE(nullptr == arg1ptr); + + auto arg2 = make_unique("world"); + auto arg2ptr = arg2.get(); + EXPECT_TRUE(prod.push(arg2ptr)); + ASSERT_FALSE(nullptr == arg2ptr); + + auto arg3 = make_unique("!"); + auto arg3ptr = arg3.get(); + EXPECT_FALSE(prod.push(arg3ptr)); + EXPECT_FALSE(arg3ptr->empty()); + ASSERT_FALSE(nullptr == arg3ptr); + EXPECT_EQ("!", *arg3ptr); + + string* receive = nullptr; + EXPECT_TRUE(cons.pop(receive)); + EXPECT_EQ("hello", *receive); +} + +namespace { + using Ptr = string*; +} + +TEST(Queue, FlexibleQ_NoMoveOfPtr) { + auto queue = queue_api::CreateQueue>(2); + auto producer = std::get(queue); + auto consumer = std::get(queue); + NoMovePtrArgument(producer, consumer); +} + +TEST(Queue, FixedQ_NoMoveOfPtr) { + auto queue = queue_api::CreateQueue>(); + auto producer = std::get(queue); + auto consumer = std::get(queue); + NoMovePtrArgument(producer, consumer); +} + +TEST(Queue, LockedQ_NoMoveUnique) { + auto queue = queue_api::CreateQueue>(2); + auto producer = std::get(queue); + auto consumer = std::get(queue); + NoMovePtrArgument(producer, consumer); +}