Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent_[bounded_]queue correctness on weak memory models #782

Merged
merged 6 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions include/oneapi/tbb/concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ class concurrent_queue {
bool internal_try_pop( void* dst ) {
ticket_type k;
do {
k = my_queue_representation->head_counter.load(std::memory_order_relaxed);
// Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
k = my_queue_representation->head_counter.load(std::memory_order_acquire);
do {
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
// Queue is empty
Expand Down Expand Up @@ -514,7 +515,8 @@ class concurrent_bounded_queue {
bool internal_pop_if_present( void* dst ) {
aleksei-fedotov marked this conversation as resolved.
Show resolved Hide resolved
ticket_type ticket;
do {
ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed);
// Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
ticket = my_queue_representation->head_counter.load(std::memory_order_acquire);
do {
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
// Queue is empty
Expand Down
14 changes: 7 additions & 7 deletions include/oneapi/tbb/detail/_concurrent_queue_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class micro_queue {
page_allocator_traits::construct(page_allocator, p);
}

if (tail_counter.load(std::memory_order_relaxed) != k) spin_wait_until_my_turn(tail_counter, k, base);
spin_wait_until_my_turn(tail_counter, k, base);
pavelkumbrasev marked this conversation as resolved.
Show resolved Hide resolved
d1::call_itt_notify(d1::acquired, &tail_counter);

if (p) {
Expand All @@ -134,9 +134,9 @@ class micro_queue {
} else {
head_page.store(p, std::memory_order_relaxed);
}
tail_page.store(p, std::memory_order_release);
tail_page.store(p, std::memory_order_relaxed);
} else {
p = tail_page.load(std::memory_order_acquire); // TODO may be relaxed ?
p = tail_page.load(std::memory_order_relaxed);
aleksei-fedotov marked this conversation as resolved.
Show resolved Hide resolved
}
return index;
}
Expand Down Expand Up @@ -179,7 +179,7 @@ class micro_queue {
d1::call_itt_notify(d1::acquired, &head_counter);
spin_wait_while_eq(tail_counter, k);
d1::call_itt_notify(d1::acquired, &tail_counter);
padded_page *p = head_page.load(std::memory_order_acquire);
padded_page *p = head_page.load(std::memory_order_relaxed);
__TBB_ASSERT( p, nullptr );
size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
bool success = false;
Expand Down Expand Up @@ -337,7 +337,7 @@ class micro_queue {

void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
for (atomic_backoff b(true);; b.pause()) {
ticket_type c = counter;
ticket_type c = counter.load(std::memory_order_acquire);
if (c == k) return;
else if (c & 1) {
++rb.n_invalid_entries;
Expand Down Expand Up @@ -378,9 +378,9 @@ class micro_queue_pop_finalizer {
if( is_valid_page(p) ) {
spin_mutex::scoped_lock lock( my_queue.page_mutex );
padded_page* q = p->next;
my_queue.head_page.store(q, std::memory_order_release);
my_queue.head_page.store(q, std::memory_order_relaxed);
if( !is_valid_page(q) ) {
my_queue.tail_page.store(nullptr, std::memory_order_release);
my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
}
}
my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
Expand Down
23 changes: 11 additions & 12 deletions test/tbb/test_concurrent_queue_whitebox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class FloggerBody {
value_type elem = value_type(thread_id);
for (std::size_t i = 0; i < elem_num; ++i) {
q.push(elem);
q.try_pop(elem);
bool res = q.try_pop(elem);
CHECK_FAST(res);
}
}

Expand Down Expand Up @@ -83,20 +84,18 @@ void test_flogger_help( Q& q, std::size_t items_per_page ) {
REQUIRE_MESSAGE(q.my_queue_representation->head_counter < hack_val, "Failed wraparound test");
}

template <typename T>
void test_flogger() {
{
tbb::concurrent_queue<T> q;
test_flogger_help(q, q.my_queue_representation->items_per_page);
}
{
tbb::concurrent_bounded_queue<T> q;
//! \brief \ref error_guessing
TEST_CASE("Test CQ Wrapparound") {
for (int i = 0; i < 1000; ++i) {
tbb::concurrent_queue<int> q;
test_flogger_help(q, q.my_queue_representation->items_per_page);
}
}

//! \brief \ref error_guessing
TEST_CASE("Test Wrapparound") {
test_flogger<int>();
// TODO: add test with unsigned char
TEST_CASE("Test CBQ Wrapparound") {
for (int i = 0; i < 1000; ++i) {
tbb::concurrent_bounded_queue<int> q;
test_flogger_help(q, q.my_queue_representation->items_per_page);
}
}