Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TBB] Fix concurrent_[bounded_]queue correctness on weak memory models [13.0.x] #8358

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 320 additions & 0 deletions tbb-782.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
From 2f1b2a2055da106b98859b60a785c0c45829dc79 Mon Sep 17 00:00:00 2001
From: Alexei Katranov <[email protected]>
Date: Mon, 21 Feb 2022 15:03:10 +0300
Subject: [PATCH 1/6] Fix cq & cbq correctness on ARM

Signed-off-by: Alexei Katranov <[email protected]>
---
include/oneapi/tbb/concurrent_queue.h | 6 +++--
.../tbb/detail/_concurrent_queue_base.h | 14 +++++------
test/tbb/test_concurrent_queue_whitebox.cpp | 23 +++++++++----------
3 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h
index 18f5bc80cf..8b5f289104 100644
--- a/include/oneapi/tbb/concurrent_queue.h
+++ b/include/oneapi/tbb/concurrent_queue.h
@@ -183,7 +183,8 @@ class concurrent_queue {
bool internal_try_pop( void* dst ) {
ticket_type k;
do {
- k = my_queue_representation->head_counter.load(std::memory_order_relaxed);
+ // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head`
+ k = my_queue_representation->head_counter.load(std::memory_order_acquire);
do {
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
// Queue is empty
@@ -514,7 +515,8 @@ class concurrent_bounded_queue {
bool internal_pop_if_present( void* dst ) {
ticket_type ticket;
do {
- ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed);
+ // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head`
+ ticket = my_queue_representation->head_counter.load(std::memory_order_acquire);
do {
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
// Queue is empty
diff --git a/include/oneapi/tbb/detail/_concurrent_queue_base.h b/include/oneapi/tbb/detail/_concurrent_queue_base.h
index 8bdf213230..b51b2e419b 100644
--- a/include/oneapi/tbb/detail/_concurrent_queue_base.h
+++ b/include/oneapi/tbb/detail/_concurrent_queue_base.h
@@ -123,7 +123,7 @@ class micro_queue {
page_allocator_traits::construct(page_allocator, p);
}

- if (tail_counter.load(std::memory_order_relaxed) != k) spin_wait_until_my_turn(tail_counter, k, base);
+ spin_wait_until_my_turn(tail_counter, k, base);
d1::call_itt_notify(d1::acquired, &tail_counter);

if (p) {
@@ -134,9 +134,9 @@ class micro_queue {
} else {
head_page.store(p, std::memory_order_relaxed);
}
- tail_page.store(p, std::memory_order_release);
+ tail_page.store(p, std::memory_order_relaxed);
} else {
- p = tail_page.load(std::memory_order_acquire); // TODO may be relaxed ?
+ p = tail_page.load(std::memory_order_relaxed);
}
return index;
}
@@ -179,7 +179,7 @@ class micro_queue {
d1::call_itt_notify(d1::acquired, &head_counter);
spin_wait_while_eq(tail_counter, k);
d1::call_itt_notify(d1::acquired, &tail_counter);
- padded_page *p = head_page.load(std::memory_order_acquire);
+ padded_page *p = head_page.load(std::memory_order_relaxed);
__TBB_ASSERT( p, nullptr );
size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
bool success = false;
@@ -337,7 +337,7 @@ class micro_queue {

void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
for (atomic_backoff b(true);; b.pause()) {
- ticket_type c = counter;
+ ticket_type c = counter.load(std::memory_order_acquire);
if (c == k) return;
else if (c & 1) {
++rb.n_invalid_entries;
@@ -378,9 +378,9 @@ class micro_queue_pop_finalizer {
if( is_valid_page(p) ) {
spin_mutex::scoped_lock lock( my_queue.page_mutex );
padded_page* q = p->next;
- my_queue.head_page.store(q, std::memory_order_release);
+ my_queue.head_page.store(q, std::memory_order_relaxed);
if( !is_valid_page(q) ) {
- my_queue.tail_page.store(nullptr, std::memory_order_release);
+ my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
}
}
my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
diff --git a/test/tbb/test_concurrent_queue_whitebox.cpp b/test/tbb/test_concurrent_queue_whitebox.cpp
index 18da8def25..59ce7f54cb 100644
--- a/test/tbb/test_concurrent_queue_whitebox.cpp
+++ b/test/tbb/test_concurrent_queue_whitebox.cpp
@@ -51,7 +51,8 @@ class FloggerBody {
value_type elem = value_type(thread_id);
for (std::size_t i = 0; i < elem_num; ++i) {
q.push(elem);
- q.try_pop(elem);
+ bool res = q.try_pop(elem);
+ CHECK_FAST(res);
}
}

@@ -83,20 +84,18 @@ void test_flogger_help( Q& q, std::size_t items_per_page ) {
REQUIRE_MESSAGE(q.my_queue_representation->head_counter < hack_val, "Failed wraparound test");
}

-template <typename T>
-void test_flogger() {
- {
- tbb::concurrent_queue<T> q;
- test_flogger_help(q, q.my_queue_representation->items_per_page);
- }
- {
- tbb::concurrent_bounded_queue<T> q;
+//! \brief \ref error_guessing
+TEST_CASE("Test CQ Wrapparound") {
+ for (int i = 0; i < 1000; ++i) {
+ tbb::concurrent_queue<int> q;
test_flogger_help(q, q.my_queue_representation->items_per_page);
}
}

//! \brief \ref error_guessing
-TEST_CASE("Test Wrapparound") {
- test_flogger<int>();
- // TODO: add test with unsigned char
+TEST_CASE("Test CBQ Wrapparound") {
+ for (int i = 0; i < 1000; ++i) {
+ tbb::concurrent_bounded_queue<int> q;
+ test_flogger_help(q, q.my_queue_representation->items_per_page);
+ }
}

From 65a157d0a625383280fe6bddd046414cd0d980f3 Mon Sep 17 00:00:00 2001
From: Alex <[email protected]>
Date: Mon, 21 Feb 2022 15:25:18 +0300
Subject: [PATCH 2/6] Fix spelling

---
include/oneapi/tbb/concurrent_queue.h | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h
index 8b5f289104..8083b5a2db 100644
--- a/include/oneapi/tbb/concurrent_queue.h
+++ b/include/oneapi/tbb/concurrent_queue.h
@@ -183,7 +183,7 @@ class concurrent_queue {
bool internal_try_pop( void* dst ) {
ticket_type k;
do {
- // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head`
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
k = my_queue_representation->head_counter.load(std::memory_order_acquire);
do {
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
@@ -515,7 +515,7 @@ class concurrent_bounded_queue {
bool internal_pop_if_present( void* dst ) {
ticket_type ticket;
do {
- // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head`
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
ticket = my_queue_representation->head_counter.load(std::memory_order_acquire);
do {
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty

From 8895e287d44120b1ebfc4eab367fc505456c5100 Mon Sep 17 00:00:00 2001
From: Alexei Katranov <[email protected]>
Date: Tue, 1 Mar 2022 15:00:08 +0300
Subject: [PATCH 3/6] Factor out try_pop

Signed-off-by: Alexei Katranov <[email protected]>
---
include/oneapi/tbb/concurrent_queue.h | 56 +++++++++++++--------------
1 file changed, 26 insertions(+), 30 deletions(-)

diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h
index 8083b5a2db..45c8cf9024 100644
--- a/include/oneapi/tbb/concurrent_queue.h
+++ b/include/oneapi/tbb/concurrent_queue.h
@@ -28,6 +28,24 @@ namespace tbb {
namespace detail {
namespace d2 {

+template <typename QueueRep, typename Allocator>
+std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
+ ticket_type ticket{};
+ do {
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
+ ticket = queue.head_counter.load(std::memory_order_acquire);
+ do {
+ if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
+ // Queue is empty
+ return { false, ticket };
+ }
+ // Queue had item with ticket k when we looked. Attempt to get that item.
+ // Another thread snatched the item, retry.
+ } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
+ } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
+ return { true, ticket };
+}
+
// A high-performance thread-safe non-blocking concurrent queue.
// Multiple threads may each push and pop concurrently.
// Assignment construction is not allowed.
@@ -181,21 +199,7 @@ class concurrent_queue {
}

bool internal_try_pop( void* dst ) {
- ticket_type k;
- do {
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
- k = my_queue_representation->head_counter.load(std::memory_order_acquire);
- do {
- if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
- // Queue is empty
- return false;
- }
-
- // Queue had item with ticket k when we looked. Attempt to get that item.
- // Another thread snatched the item, retry.
- } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1));
- } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator));
- return true;
+ return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
}

template <typename Container, typename Value, typename A>
@@ -513,22 +517,14 @@ class concurrent_bounded_queue {
}

bool internal_pop_if_present( void* dst ) {
- ticket_type ticket;
- do {
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
- ticket = my_queue_representation->head_counter.load(std::memory_order_acquire);
- do {
- if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
- // Queue is empty
- return false;
- }
- // Queue had item with ticket k when we looked. Attempt to get that item.
- // Another thread snatched the item, retry.
- } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1));
- } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator));
+ bool present{};
+ ticket_type ticket{};
+ std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);

- r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
- return true;
+ if (present) {
+ r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
+ }
+ return present;
}

void internal_abort() {

From 14532b2b7551f9a20653b7f4f692f8e52b35adbb Mon Sep 17 00:00:00 2001
From: Alex <[email protected]>
Date: Tue, 1 Mar 2022 15:09:42 +0300
Subject: [PATCH 4/6] Update test_concurrent_queue_whitebox.cpp

Fix copyright year
---
test/tbb/test_concurrent_queue_whitebox.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/test/tbb/test_concurrent_queue_whitebox.cpp b/test/tbb/test_concurrent_queue_whitebox.cpp
index 59ce7f54cb..1ba1530e70 100644
--- a/test/tbb/test_concurrent_queue_whitebox.cpp
+++ b/test/tbb/test_concurrent_queue_whitebox.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2005-2021 Intel Corporation
+ Copyright (c) 2005-2022 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

From 073cac5d77fc7045aad2f96d92b6030d71617cf4 Mon Sep 17 00:00:00 2001
From: pavelkumbrasev <[email protected]>
Date: Fri, 4 Nov 2022 08:52:17 +0000
Subject: [PATCH 6/6] Apply review suggestions

Signed-off-by: pavelkumbrasev <[email protected]>
---
include/oneapi/tbb/concurrent_queue.h | 2 +-
include/oneapi/tbb/detail/_concurrent_queue_base.h | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h
index 0a821efa2a..d694bd2b47 100644
--- a/include/oneapi/tbb/concurrent_queue.h
+++ b/include/oneapi/tbb/concurrent_queue.h
@@ -32,7 +32,7 @@ template <typename QueueRep, typename Allocator>
std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
ticket_type ticket{};
do {
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
+ // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
ticket = queue.head_counter.load(std::memory_order_acquire);
do {
if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
diff --git a/include/oneapi/tbb/detail/_concurrent_queue_base.h b/include/oneapi/tbb/detail/_concurrent_queue_base.h
index b51b2e419b..b868797985 100644
--- a/include/oneapi/tbb/detail/_concurrent_queue_base.h
+++ b/include/oneapi/tbb/detail/_concurrent_queue_base.h
@@ -336,7 +336,7 @@ class micro_queue {
}

void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
- for (atomic_backoff b(true);; b.pause()) {
+ for (atomic_backoff b{};; b.pause()) {
ticket_type c = counter.load(std::memory_order_acquire);
if (c == k) return;
else if (c & 1) {
2 changes: 2 additions & 0 deletions tbb.spec
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
%define github_user oneapi-src
%define github_repo oneTBB
Source: git+https://github.com/%{github_user}/%{github_repo}.git?obj=%{branch}/%{tag}&export=%{n}-%{realversion}&output=/%{n}-%{branch}-%{tag}.tgz
Patch0: tbb-782
Requires: hwloc
BuildRequires: cmake

%prep
%setup -n %{n}-%{realversion}
%patch0 -p1

%build
rm -rf %{_builddir}/build
Expand Down