-
Notifications
You must be signed in to change notification settings - Fork 184
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[TBB] Fix concurrent_[bounded_]queue correctness on weak memory models
Co-authored-by: Ivan Razumov <[email protected]> Co-authored-by: Andrea Bocci <[email protected]>
- Loading branch information
1 parent
770e0f9
commit 867a8b5
Showing
2 changed files
with
322 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,320 @@ | ||
From 2f1b2a2055da106b98859b60a785c0c45829dc79 Mon Sep 17 00:00:00 2001 | ||
From: Alexei Katranov <[email protected]> | ||
Date: Mon, 21 Feb 2022 15:03:10 +0300 | ||
Subject: [PATCH 1/6] Fix cq & cbq correctness on ARM | ||
|
||
Signed-off-by: Alexei Katranov <[email protected]> | ||
--- | ||
include/oneapi/tbb/concurrent_queue.h | 6 +++-- | ||
.../tbb/detail/_concurrent_queue_base.h | 14 +++++------ | ||
test/tbb/test_concurrent_queue_whitebox.cpp | 23 +++++++++---------- | ||
3 files changed, 22 insertions(+), 21 deletions(-) | ||
|
||
diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h | ||
index 18f5bc80cf..8b5f289104 100644 | ||
--- a/include/oneapi/tbb/concurrent_queue.h | ||
+++ b/include/oneapi/tbb/concurrent_queue.h | ||
@@ -183,7 +183,8 @@ class concurrent_queue { | ||
bool internal_try_pop( void* dst ) { | ||
ticket_type k; | ||
do { | ||
- k = my_queue_representation->head_counter.load(std::memory_order_relaxed); | ||
+ // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head` | ||
+ k = my_queue_representation->head_counter.load(std::memory_order_acquire); | ||
do { | ||
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { | ||
// Queue is empty | ||
@@ -514,7 +515,8 @@ class concurrent_bounded_queue { | ||
bool internal_pop_if_present( void* dst ) { | ||
ticket_type ticket; | ||
do { | ||
- ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed); | ||
+ // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head` | ||
+ ticket = my_queue_representation->head_counter.load(std::memory_order_acquire); | ||
do { | ||
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty | ||
// Queue is empty | ||
diff --git a/include/oneapi/tbb/detail/_concurrent_queue_base.h b/include/oneapi/tbb/detail/_concurrent_queue_base.h | ||
index 8bdf213230..b51b2e419b 100644 | ||
--- a/include/oneapi/tbb/detail/_concurrent_queue_base.h | ||
+++ b/include/oneapi/tbb/detail/_concurrent_queue_base.h | ||
@@ -123,7 +123,7 @@ class micro_queue { | ||
page_allocator_traits::construct(page_allocator, p); | ||
} | ||
|
||
- if (tail_counter.load(std::memory_order_relaxed) != k) spin_wait_until_my_turn(tail_counter, k, base); | ||
+ spin_wait_until_my_turn(tail_counter, k, base); | ||
d1::call_itt_notify(d1::acquired, &tail_counter); | ||
|
||
if (p) { | ||
@@ -134,9 +134,9 @@ class micro_queue { | ||
} else { | ||
head_page.store(p, std::memory_order_relaxed); | ||
} | ||
- tail_page.store(p, std::memory_order_release); | ||
+ tail_page.store(p, std::memory_order_relaxed); | ||
} else { | ||
- p = tail_page.load(std::memory_order_acquire); // TODO may be relaxed ? | ||
+ p = tail_page.load(std::memory_order_relaxed); | ||
} | ||
return index; | ||
} | ||
@@ -179,7 +179,7 @@ class micro_queue { | ||
d1::call_itt_notify(d1::acquired, &head_counter); | ||
spin_wait_while_eq(tail_counter, k); | ||
d1::call_itt_notify(d1::acquired, &tail_counter); | ||
- padded_page *p = head_page.load(std::memory_order_acquire); | ||
+ padded_page *p = head_page.load(std::memory_order_relaxed); | ||
__TBB_ASSERT( p, nullptr ); | ||
size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page ); | ||
bool success = false; | ||
@@ -337,7 +337,7 @@ class micro_queue { | ||
|
||
void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const { | ||
for (atomic_backoff b(true);; b.pause()) { | ||
- ticket_type c = counter; | ||
+ ticket_type c = counter.load(std::memory_order_acquire); | ||
if (c == k) return; | ||
else if (c & 1) { | ||
++rb.n_invalid_entries; | ||
@@ -378,9 +378,9 @@ class micro_queue_pop_finalizer { | ||
if( is_valid_page(p) ) { | ||
spin_mutex::scoped_lock lock( my_queue.page_mutex ); | ||
padded_page* q = p->next; | ||
- my_queue.head_page.store(q, std::memory_order_release); | ||
+ my_queue.head_page.store(q, std::memory_order_relaxed); | ||
if( !is_valid_page(q) ) { | ||
- my_queue.tail_page.store(nullptr, std::memory_order_release); | ||
+ my_queue.tail_page.store(nullptr, std::memory_order_relaxed); | ||
} | ||
} | ||
my_queue.head_counter.store(my_ticket_type, std::memory_order_release); | ||
diff --git a/test/tbb/test_concurrent_queue_whitebox.cpp b/test/tbb/test_concurrent_queue_whitebox.cpp | ||
index 18da8def25..59ce7f54cb 100644 | ||
--- a/test/tbb/test_concurrent_queue_whitebox.cpp | ||
+++ b/test/tbb/test_concurrent_queue_whitebox.cpp | ||
@@ -51,7 +51,8 @@ class FloggerBody { | ||
value_type elem = value_type(thread_id); | ||
for (std::size_t i = 0; i < elem_num; ++i) { | ||
q.push(elem); | ||
- q.try_pop(elem); | ||
+ bool res = q.try_pop(elem); | ||
+ CHECK_FAST(res); | ||
} | ||
} | ||
|
||
@@ -83,20 +84,18 @@ void test_flogger_help( Q& q, std::size_t items_per_page ) { | ||
REQUIRE_MESSAGE(q.my_queue_representation->head_counter < hack_val, "Failed wraparound test"); | ||
} | ||
|
||
-template <typename T> | ||
-void test_flogger() { | ||
- { | ||
- tbb::concurrent_queue<T> q; | ||
- test_flogger_help(q, q.my_queue_representation->items_per_page); | ||
- } | ||
- { | ||
- tbb::concurrent_bounded_queue<T> q; | ||
+//! \brief \ref error_guessing | ||
+TEST_CASE("Test CQ Wrapparound") { | ||
+ for (int i = 0; i < 1000; ++i) { | ||
+ tbb::concurrent_queue<int> q; | ||
test_flogger_help(q, q.my_queue_representation->items_per_page); | ||
} | ||
} | ||
|
||
//! \brief \ref error_guessing | ||
-TEST_CASE("Test Wrapparound") { | ||
- test_flogger<int>(); | ||
- // TODO: add test with unsigned char | ||
+TEST_CASE("Test CBQ Wrapparound") { | ||
+ for (int i = 0; i < 1000; ++i) { | ||
+ tbb::concurrent_bounded_queue<int> q; | ||
+ test_flogger_help(q, q.my_queue_representation->items_per_page); | ||
+ } | ||
} | ||
|
||
From 65a157d0a625383280fe6bddd046414cd0d980f3 Mon Sep 17 00:00:00 2001 | ||
From: Alex <[email protected]> | ||
Date: Mon, 21 Feb 2022 15:25:18 +0300 | ||
Subject: [PATCH 2/6] Fix spelling | ||
|
||
--- | ||
include/oneapi/tbb/concurrent_queue.h | 4 ++-- | ||
1 file changed, 2 insertions(+), 2 deletions(-) | ||
|
||
diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h | ||
index 8b5f289104..8083b5a2db 100644 | ||
--- a/include/oneapi/tbb/concurrent_queue.h | ||
+++ b/include/oneapi/tbb/concurrent_queue.h | ||
@@ -183,7 +183,7 @@ class concurrent_queue { | ||
bool internal_try_pop( void* dst ) { | ||
ticket_type k; | ||
do { | ||
- // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head` | ||
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` | ||
k = my_queue_representation->head_counter.load(std::memory_order_acquire); | ||
do { | ||
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { | ||
@@ -515,7 +515,7 @@ class concurrent_bounded_queue { | ||
bool internal_pop_if_present( void* dst ) { | ||
ticket_type ticket; | ||
do { | ||
- // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head` | ||
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` | ||
ticket = my_queue_representation->head_counter.load(std::memory_order_acquire); | ||
do { | ||
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty | ||
|
||
From 8895e287d44120b1ebfc4eab367fc505456c5100 Mon Sep 17 00:00:00 2001 | ||
From: Alexei Katranov <[email protected]> | ||
Date: Tue, 1 Mar 2022 15:00:08 +0300 | ||
Subject: [PATCH 3/6] Factor out try_pop | ||
|
||
Signed-off-by: Alexei Katranov <[email protected]> | ||
--- | ||
include/oneapi/tbb/concurrent_queue.h | 56 +++++++++++++-------------- | ||
1 file changed, 26 insertions(+), 30 deletions(-) | ||
|
||
diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h | ||
index 8083b5a2db..45c8cf9024 100644 | ||
--- a/include/oneapi/tbb/concurrent_queue.h | ||
+++ b/include/oneapi/tbb/concurrent_queue.h | ||
@@ -28,6 +28,24 @@ namespace tbb { | ||
namespace detail { | ||
namespace d2 { | ||
|
||
+template <typename QueueRep, typename Allocator> | ||
+std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) { | ||
+ ticket_type ticket{}; | ||
+ do { | ||
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` | ||
+ ticket = queue.head_counter.load(std::memory_order_acquire); | ||
+ do { | ||
+ if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty | ||
+ // Queue is empty | ||
+ return { false, ticket }; | ||
+ } | ||
+ // Queue had item with ticket k when we looked. Attempt to get that item. | ||
+ // Another thread snatched the item, retry. | ||
+ } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1)); | ||
+ } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc)); | ||
+ return { true, ticket }; | ||
+} | ||
+ | ||
// A high-performance thread-safe non-blocking concurrent queue. | ||
// Multiple threads may each push and pop concurrently. | ||
// Assignment construction is not allowed. | ||
@@ -181,21 +199,7 @@ class concurrent_queue { | ||
} | ||
|
||
bool internal_try_pop( void* dst ) { | ||
- ticket_type k; | ||
- do { | ||
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` | ||
- k = my_queue_representation->head_counter.load(std::memory_order_acquire); | ||
- do { | ||
- if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { | ||
- // Queue is empty | ||
- return false; | ||
- } | ||
- | ||
- // Queue had item with ticket k when we looked. Attempt to get that item. | ||
- // Another thread snatched the item, retry. | ||
- } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1)); | ||
- } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator)); | ||
- return true; | ||
+ return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first; | ||
} | ||
|
||
template <typename Container, typename Value, typename A> | ||
@@ -513,22 +517,14 @@ class concurrent_bounded_queue { | ||
} | ||
|
||
bool internal_pop_if_present( void* dst ) { | ||
- ticket_type ticket; | ||
- do { | ||
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` | ||
- ticket = my_queue_representation->head_counter.load(std::memory_order_acquire); | ||
- do { | ||
- if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty | ||
- // Queue is empty | ||
- return false; | ||
- } | ||
- // Queue had item with ticket k when we looked. Attempt to get that item. | ||
- // Another thread snatched the item, retry. | ||
- } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1)); | ||
- } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator)); | ||
+ bool present{}; | ||
+ ticket_type ticket{}; | ||
+ std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator); | ||
|
||
- r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); | ||
- return true; | ||
+ if (present) { | ||
+ r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); | ||
+ } | ||
+ return present; | ||
} | ||
|
||
void internal_abort() { | ||
|
||
From 14532b2b7551f9a20653b7f4f692f8e52b35adbb Mon Sep 17 00:00:00 2001 | ||
From: Alex <[email protected]> | ||
Date: Tue, 1 Mar 2022 15:09:42 +0300 | ||
Subject: [PATCH 4/6] Update test_concurrent_queue_whitebox.cpp | ||
|
||
Fix copyright year | ||
--- | ||
test/tbb/test_concurrent_queue_whitebox.cpp | 2 +- | ||
1 file changed, 1 insertion(+), 1 deletion(-) | ||
|
||
diff --git a/test/tbb/test_concurrent_queue_whitebox.cpp b/test/tbb/test_concurrent_queue_whitebox.cpp | ||
index 59ce7f54cb..1ba1530e70 100644 | ||
--- a/test/tbb/test_concurrent_queue_whitebox.cpp | ||
+++ b/test/tbb/test_concurrent_queue_whitebox.cpp | ||
@@ -1,5 +1,5 @@ | ||
/* | ||
- Copyright (c) 2005-2021 Intel Corporation | ||
+ Copyright (c) 2005-2022 Intel Corporation | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
|
||
From 073cac5d77fc7045aad2f96d92b6030d71617cf4 Mon Sep 17 00:00:00 2001 | ||
From: pavelkumbrasev <[email protected]> | ||
Date: Fri, 4 Nov 2022 08:52:17 +0000 | ||
Subject: [PATCH 6/6] Apply review suggestions | ||
|
||
Signed-off-by: pavelkumbrasev <[email protected]> | ||
--- | ||
include/oneapi/tbb/concurrent_queue.h | 2 +- | ||
include/oneapi/tbb/detail/_concurrent_queue_base.h | 2 +- | ||
2 files changed, 2 insertions(+), 2 deletions(-) | ||
|
||
diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h | ||
index 0a821efa2a..d694bd2b47 100644 | ||
--- a/include/oneapi/tbb/concurrent_queue.h | ||
+++ b/include/oneapi/tbb/concurrent_queue.h | ||
@@ -32,7 +32,7 @@ template <typename QueueRep, typename Allocator> | ||
std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) { | ||
ticket_type ticket{}; | ||
do { | ||
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` | ||
+ // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter` | ||
ticket = queue.head_counter.load(std::memory_order_acquire); | ||
do { | ||
if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty | ||
diff --git a/include/oneapi/tbb/detail/_concurrent_queue_base.h b/include/oneapi/tbb/detail/_concurrent_queue_base.h | ||
index b51b2e419b..b868797985 100644 | ||
--- a/include/oneapi/tbb/detail/_concurrent_queue_base.h | ||
+++ b/include/oneapi/tbb/detail/_concurrent_queue_base.h | ||
@@ -336,7 +336,7 @@ class micro_queue { | ||
} | ||
|
||
void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const { | ||
- for (atomic_backoff b(true);; b.pause()) { | ||
+ for (atomic_backoff b{};; b.pause()) { | ||
ticket_type c = counter.load(std::memory_order_acquire); | ||
if (c == k) return; | ||
else if (c & 1) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters