From 867a8b5274e0bdd0e10a4931cef62aed0247dc33 Mon Sep 17 00:00:00 2001 From: Malik Shahzad Muzaffar Date: Fri, 3 Mar 2023 16:39:19 +0100 Subject: [PATCH] [TBB] Fix concurrent_[bounded_]queue correctness on weak memory models Co-authored-by: Ivan Razumov Co-authored-by: Andrea Bocci --- tbb-782.patch | 320 ++++++++++++++++++++++++++++++++++++++++++++++++++ tbb.spec | 2 + 2 files changed, 322 insertions(+) create mode 100644 tbb-782.patch diff --git a/tbb-782.patch b/tbb-782.patch new file mode 100644 index 00000000000..08f0300bc3b --- /dev/null +++ b/tbb-782.patch @@ -0,0 +1,320 @@ +From 2f1b2a2055da106b98859b60a785c0c45829dc79 Mon Sep 17 00:00:00 2001 +From: Alexei Katranov +Date: Mon, 21 Feb 2022 15:03:10 +0300 +Subject: [PATCH 1/6] Fix cq & cbq correctness on ARM + +Signed-off-by: Alexei Katranov +--- + include/oneapi/tbb/concurrent_queue.h | 6 +++-- + .../tbb/detail/_concurrent_queue_base.h | 14 +++++------ + test/tbb/test_concurrent_queue_whitebox.cpp | 23 +++++++++---------- + 3 files changed, 22 insertions(+), 21 deletions(-) + +diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h +index 18f5bc80cf..8b5f289104 100644 +--- a/include/oneapi/tbb/concurrent_queue.h ++++ b/include/oneapi/tbb/concurrent_queue.h +@@ -183,7 +183,8 @@ class concurrent_queue { + bool internal_try_pop( void* dst ) { + ticket_type k; + do { +- k = my_queue_representation->head_counter.load(std::memory_order_relaxed); ++ // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head` ++ k = my_queue_representation->head_counter.load(std::memory_order_acquire); + do { + if (static_cast(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { + // Queue is empty +@@ -514,7 +515,8 @@ class concurrent_bounded_queue { + bool internal_pop_if_present( void* dst ) { + ticket_type ticket; + do { +- ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed); ++ // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head` ++ ticket = my_queue_representation->head_counter.load(std::memory_order_acquire); + do { + if (static_cast(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty + // Queue is empty +diff --git a/include/oneapi/tbb/detail/_concurrent_queue_base.h b/include/oneapi/tbb/detail/_concurrent_queue_base.h +index 8bdf213230..b51b2e419b 100644 +--- a/include/oneapi/tbb/detail/_concurrent_queue_base.h ++++ b/include/oneapi/tbb/detail/_concurrent_queue_base.h +@@ -123,7 +123,7 @@ class micro_queue { + page_allocator_traits::construct(page_allocator, p); + } + +- if (tail_counter.load(std::memory_order_relaxed) != k) spin_wait_until_my_turn(tail_counter, k, base); ++ spin_wait_until_my_turn(tail_counter, k, base); + d1::call_itt_notify(d1::acquired, &tail_counter); + + if (p) { +@@ -134,9 +134,9 @@ class micro_queue { + } else { + head_page.store(p, std::memory_order_relaxed); + } +- tail_page.store(p, std::memory_order_release); ++ tail_page.store(p, std::memory_order_relaxed); + } else { +- p = tail_page.load(std::memory_order_acquire); // TODO may be relaxed ? ++ p = tail_page.load(std::memory_order_relaxed); + } + return index; + } +@@ -179,7 +179,7 @@ class micro_queue { + d1::call_itt_notify(d1::acquired, &head_counter); + spin_wait_while_eq(tail_counter, k); + d1::call_itt_notify(d1::acquired, &tail_counter); +- padded_page *p = head_page.load(std::memory_order_acquire); ++ padded_page *p = head_page.load(std::memory_order_relaxed); + __TBB_ASSERT( p, nullptr ); + size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page ); + bool success = false; +@@ -337,7 +337,7 @@ class micro_queue { + + void spin_wait_until_my_turn( std::atomic& counter, ticket_type k, queue_rep_type& rb ) const { + for (atomic_backoff b(true);; b.pause()) { +- ticket_type c = counter; ++ ticket_type c = counter.load(std::memory_order_acquire); + if (c == k) return; + else if (c & 1) { + ++rb.n_invalid_entries; +@@ -378,9 +378,9 @@ class micro_queue_pop_finalizer { + if( is_valid_page(p) ) { + spin_mutex::scoped_lock lock( my_queue.page_mutex ); + padded_page* q = p->next; +- my_queue.head_page.store(q, std::memory_order_release); ++ my_queue.head_page.store(q, std::memory_order_relaxed); + if( !is_valid_page(q) ) { +- my_queue.tail_page.store(nullptr, std::memory_order_release); ++ my_queue.tail_page.store(nullptr, std::memory_order_relaxed); + } + } + my_queue.head_counter.store(my_ticket_type, std::memory_order_release); +diff --git a/test/tbb/test_concurrent_queue_whitebox.cpp b/test/tbb/test_concurrent_queue_whitebox.cpp +index 18da8def25..59ce7f54cb 100644 +--- a/test/tbb/test_concurrent_queue_whitebox.cpp ++++ b/test/tbb/test_concurrent_queue_whitebox.cpp +@@ -51,7 +51,8 @@ class FloggerBody { + value_type elem = value_type(thread_id); + for (std::size_t i = 0; i < elem_num; ++i) { + q.push(elem); +- q.try_pop(elem); ++ bool res = q.try_pop(elem); ++ CHECK_FAST(res); + } + } + +@@ -83,20 +84,18 @@ void test_flogger_help( Q& q, std::size_t items_per_page ) { + REQUIRE_MESSAGE(q.my_queue_representation->head_counter < hack_val, "Failed wraparound test"); + } + +-template +-void test_flogger() { +- { +- tbb::concurrent_queue q; +- test_flogger_help(q, q.my_queue_representation->items_per_page); +- } +- { +- tbb::concurrent_bounded_queue q; ++//! \brief \ref error_guessing ++TEST_CASE("Test CQ Wrapparound") { ++ for (int i = 0; i < 1000; ++i) { ++ tbb::concurrent_queue q; + test_flogger_help(q, q.my_queue_representation->items_per_page); + } + } + + //! \brief \ref error_guessing +-TEST_CASE("Test Wrapparound") { +- test_flogger(); +- // TODO: add test with unsigned char ++TEST_CASE("Test CBQ Wrapparound") { ++ for (int i = 0; i < 1000; ++i) { ++ tbb::concurrent_bounded_queue q; ++ test_flogger_help(q, q.my_queue_representation->items_per_page); ++ } + } + +From 65a157d0a625383280fe6bddd046414cd0d980f3 Mon Sep 17 00:00:00 2001 +From: Alex +Date: Mon, 21 Feb 2022 15:25:18 +0300 +Subject: [PATCH 2/6] Fix spelling + +--- + include/oneapi/tbb/concurrent_queue.h | 4 ++-- + 1 file changed, 2 insertions(+), 2 deletions(-) + +diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h +index 8b5f289104..8083b5a2db 100644 +--- a/include/oneapi/tbb/concurrent_queue.h ++++ b/include/oneapi/tbb/concurrent_queue.h +@@ -183,7 +183,7 @@ class concurrent_queue { + bool internal_try_pop( void* dst ) { + ticket_type k; + do { +- // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head` ++ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` + k = my_queue_representation->head_counter.load(std::memory_order_acquire); + do { + if (static_cast(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { +@@ -515,7 +515,7 @@ class concurrent_bounded_queue { + bool internal_pop_if_present( void* dst ) { + ticket_type ticket; + do { +- // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head` ++ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` + ticket = my_queue_representation->head_counter.load(std::memory_order_acquire); + do { + if (static_cast(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty + +From 8895e287d44120b1ebfc4eab367fc505456c5100 Mon Sep 17 00:00:00 2001 +From: Alexei Katranov +Date: Tue, 1 Mar 2022 15:00:08 +0300 +Subject: [PATCH 3/6] Factor out try_pop + +Signed-off-by: Alexei Katranov +--- + include/oneapi/tbb/concurrent_queue.h | 56 +++++++++++++-------------- + 1 file changed, 26 insertions(+), 30 deletions(-) + +diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h +index 8083b5a2db..45c8cf9024 100644 +--- a/include/oneapi/tbb/concurrent_queue.h ++++ b/include/oneapi/tbb/concurrent_queue.h +@@ -28,6 +28,24 @@ namespace tbb { + namespace detail { + namespace d2 { + ++template ++std::pair internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) { ++ ticket_type ticket{}; ++ do { ++ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` ++ ticket = queue.head_counter.load(std::memory_order_acquire); ++ do { ++ if (static_cast(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty ++ // Queue is empty ++ return { false, ticket }; ++ } ++ // Queue had item with ticket k when we looked. Attempt to get that item. ++ // Another thread snatched the item, retry. ++ } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1)); ++ } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc)); ++ return { true, ticket }; ++} ++ + // A high-performance thread-safe non-blocking concurrent queue. + // Multiple threads may each push and pop concurrently. + // Assignment construction is not allowed. +@@ -181,21 +199,7 @@ class concurrent_queue { + } + + bool internal_try_pop( void* dst ) { +- ticket_type k; +- do { +- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` +- k = my_queue_representation->head_counter.load(std::memory_order_acquire); +- do { +- if (static_cast(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { +- // Queue is empty +- return false; +- } +- +- // Queue had item with ticket k when we looked. Attempt to get that item. +- // Another thread snatched the item, retry. +- } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1)); +- } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator)); +- return true; ++ return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first; + } + + template +@@ -513,22 +517,14 @@ class concurrent_bounded_queue { + } + + bool internal_pop_if_present( void* dst ) { +- ticket_type ticket; +- do { +- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` +- ticket = my_queue_representation->head_counter.load(std::memory_order_acquire); +- do { +- if (static_cast(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty +- // Queue is empty +- return false; +- } +- // Queue had item with ticket k when we looked. Attempt to get that item. +- // Another thread snatched the item, retry. +- } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1)); +- } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator)); ++ bool present{}; ++ ticket_type ticket{}; ++ std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator); + +- r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); +- return true; ++ if (present) { ++ r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); ++ } ++ return present; + } + + void internal_abort() { + +From 14532b2b7551f9a20653b7f4f692f8e52b35adbb Mon Sep 17 00:00:00 2001 +From: Alex +Date: Tue, 1 Mar 2022 15:09:42 +0300 +Subject: [PATCH 4/6] Update test_concurrent_queue_whitebox.cpp + +Fix copyright year +--- + test/tbb/test_concurrent_queue_whitebox.cpp | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/test/tbb/test_concurrent_queue_whitebox.cpp b/test/tbb/test_concurrent_queue_whitebox.cpp +index 59ce7f54cb..1ba1530e70 100644 +--- a/test/tbb/test_concurrent_queue_whitebox.cpp ++++ b/test/tbb/test_concurrent_queue_whitebox.cpp +@@ -1,5 +1,5 @@ + /* +- Copyright (c) 2005-2021 Intel Corporation ++ Copyright (c) 2005-2022 Intel Corporation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + +From 073cac5d77fc7045aad2f96d92b6030d71617cf4 Mon Sep 17 00:00:00 2001 +From: pavelkumbrasev +Date: Fri, 4 Nov 2022 08:52:17 +0000 +Subject: [PATCH 6/6] Apply review suggestions + +Signed-off-by: pavelkumbrasev +--- + include/oneapi/tbb/concurrent_queue.h | 2 +- + include/oneapi/tbb/detail/_concurrent_queue_base.h | 2 +- + 2 files changed, 2 insertions(+), 2 deletions(-) + +diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h +index 0a821efa2a..d694bd2b47 100644 +--- a/include/oneapi/tbb/concurrent_queue.h ++++ b/include/oneapi/tbb/concurrent_queue.h +@@ -32,7 +32,7 @@ template + std::pair internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) { + ticket_type ticket{}; + do { +- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head` ++ // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter` + ticket = queue.head_counter.load(std::memory_order_acquire); + do { + if (static_cast(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty +diff --git a/include/oneapi/tbb/detail/_concurrent_queue_base.h b/include/oneapi/tbb/detail/_concurrent_queue_base.h +index b51b2e419b..b868797985 100644 +--- a/include/oneapi/tbb/detail/_concurrent_queue_base.h ++++ b/include/oneapi/tbb/detail/_concurrent_queue_base.h +@@ -336,7 +336,7 @@ class micro_queue { + } + + void spin_wait_until_my_turn( std::atomic& counter, ticket_type k, queue_rep_type& rb ) const { +- for (atomic_backoff b(true);; b.pause()) { ++ for (atomic_backoff b{};; b.pause()) { + ticket_type c = counter.load(std::memory_order_acquire); + if (c == k) return; + else if (c & 1) { diff --git a/tbb.spec b/tbb.spec index d0b23c2a6eb..9c0269483fb 100644 --- a/tbb.spec +++ b/tbb.spec @@ -5,11 +5,13 @@ %define github_user oneapi-src %define github_repo oneTBB Source: git+https://github.com/%{github_user}/%{github_repo}.git?obj=%{branch}/%{tag}&export=%{n}-%{realversion}&output=/%{n}-%{branch}-%{tag}.tgz +Patch0: tbb-782 Requires: hwloc BuildRequires: cmake %prep %setup -n %{n}-%{realversion} +%patch0 -p1 %build rm -rf %{_builddir}/build