Skip to content

Commit

Permalink
Fix concurrent_[bounded_]queue correctness on weak memory models (#782)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexei Katranov <[email protected]>
Co-authored-by: pavelkumbrasev <[email protected]>
  • Loading branch information
alexey-katranov and pavelkumbrasev authored Nov 4, 2022
1 parent a45d68f commit eac9465
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 49 deletions.
54 changes: 26 additions & 28 deletions include/oneapi/tbb/concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ namespace tbb {
namespace detail {
namespace d2 {

template <typename QueueRep, typename Allocator>
std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
ticket_type ticket{};
do {
// Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
ticket = queue.head_counter.load(std::memory_order_acquire);
do {
if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
// Queue is empty
return { false, ticket };
}
// Queue had item with ticket k when we looked. Attempt to get that item.
// Another thread snatched the item, retry.
} while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
} while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
return { true, ticket };
}

// A high-performance thread-safe non-blocking concurrent queue.
// Multiple threads may each push and pop concurrently.
// Assignment construction is not allowed.
Expand Down Expand Up @@ -178,20 +196,7 @@ class concurrent_queue {
}

bool internal_try_pop( void* dst ) {
ticket_type k;
do {
k = my_queue_representation->head_counter.load(std::memory_order_relaxed);
do {
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
// Queue is empty
return false;
}

// Queue had item with ticket k when we looked. Attempt to get that item.
// Another thread snatched the item, retry.
} while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1));
} while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator));
return true;
return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
}

template <typename Container, typename Value, typename A>
Expand Down Expand Up @@ -505,21 +510,14 @@ class concurrent_bounded_queue {
}

bool internal_pop_if_present( void* dst ) {
ticket_type ticket;
do {
ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed);
do {
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
// Queue is empty
return false;
}
// Queue had item with ticket k when we looked. Attempt to get that item.
// Another thread snatched the item, retry.
} while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1));
} while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator));
bool present{};
ticket_type ticket{};
std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);

r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
return true;
if (present) {
r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
}
return present;
}

void internal_abort() {
Expand Down
16 changes: 8 additions & 8 deletions include/oneapi/tbb/detail/_concurrent_queue_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class micro_queue {
page_allocator_traits::construct(page_allocator, p);
}

if (tail_counter.load(std::memory_order_relaxed) != k) spin_wait_until_my_turn(tail_counter, k, base);
spin_wait_until_my_turn(tail_counter, k, base);
d1::call_itt_notify(d1::acquired, &tail_counter);

if (p) {
Expand All @@ -134,9 +134,9 @@ class micro_queue {
} else {
head_page.store(p, std::memory_order_relaxed);
}
tail_page.store(p, std::memory_order_release);
tail_page.store(p, std::memory_order_relaxed);
} else {
p = tail_page.load(std::memory_order_acquire); // TODO may be relaxed ?
p = tail_page.load(std::memory_order_relaxed);
}
return index;
}
Expand Down Expand Up @@ -179,7 +179,7 @@ class micro_queue {
d1::call_itt_notify(d1::acquired, &head_counter);
spin_wait_while_eq(tail_counter, k);
d1::call_itt_notify(d1::acquired, &tail_counter);
padded_page *p = head_page.load(std::memory_order_acquire);
padded_page *p = head_page.load(std::memory_order_relaxed);
__TBB_ASSERT( p, nullptr );
size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
bool success = false;
Expand Down Expand Up @@ -338,8 +338,8 @@ class micro_queue {
}

void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
for (atomic_backoff b(true);; b.pause()) {
ticket_type c = counter;
for (atomic_backoff b{};; b.pause()) {
ticket_type c = counter.load(std::memory_order_acquire);
if (c == k) return;
else if (c & 1) {
++rb.n_invalid_entries;
Expand Down Expand Up @@ -380,9 +380,9 @@ class micro_queue_pop_finalizer {
if( is_valid_page(p) ) {
spin_mutex::scoped_lock lock( my_queue.page_mutex );
padded_page* q = p->next;
my_queue.head_page.store(q, std::memory_order_release);
my_queue.head_page.store(q, std::memory_order_relaxed);
if( !is_valid_page(q) ) {
my_queue.tail_page.store(nullptr, std::memory_order_release);
my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
}
}
my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
Expand Down
25 changes: 12 additions & 13 deletions test/tbb/test_concurrent_queue_whitebox.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2022 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,8 @@ class FloggerBody {
value_type elem = value_type(thread_id);
for (std::size_t i = 0; i < elem_num; ++i) {
q.push(elem);
q.try_pop(elem);
bool res = q.try_pop(elem);
CHECK_FAST(res);
}
}

Expand Down Expand Up @@ -83,20 +84,18 @@ void test_flogger_help( Q& q, std::size_t items_per_page ) {
REQUIRE_MESSAGE(q.my_queue_representation->head_counter < hack_val, "Failed wraparound test");
}

template <typename T>
void test_flogger() {
{
tbb::concurrent_queue<T> q;
test_flogger_help(q, q.my_queue_representation->items_per_page);
}
{
tbb::concurrent_bounded_queue<T> q;
//! \brief \ref error_guessing
TEST_CASE("Test CQ Wrapparound") {
for (int i = 0; i < 1000; ++i) {
tbb::concurrent_queue<int> q;
test_flogger_help(q, q.my_queue_representation->items_per_page);
}
}

//! \brief \ref error_guessing
TEST_CASE("Test Wrapparound") {
test_flogger<int>();
// TODO: add test with unsigned char
TEST_CASE("Test CBQ Wrapparound") {
for (int i = 0; i < 1000; ++i) {
tbb::concurrent_bounded_queue<int> q;
test_flogger_help(q, q.my_queue_representation->items_per_page);
}
}

0 comments on commit eac9465

Please sign in to comment.