diff --git a/src/tbb/arena.cpp b/src/tbb/arena.cpp index cb70cdd5068..58e26d16e74 100644 --- a/src/tbb/arena.cpp +++ b/src/tbb/arena.cpp @@ -104,7 +104,7 @@ std::size_t arena::occupy_free_slot(thread_data& tls) { std::uintptr_t arena::calculate_stealing_threshold() { stack_anchor_type anchor; - return r1::calculate_stealing_threshold(reinterpret_cast(&anchor), my_market->worker_stack_size()); + return r1::calculate_stealing_threshold(reinterpret_cast(&anchor), my_permit_manager->worker_stack_size()); } void arena::process(thread_data& tls) { @@ -164,12 +164,12 @@ void arena::process(thread_data& tls) { } // arena::arena (permit_manager& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch) -arena::arena (market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch) +arena::arena (permit_manager& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch) { __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); - my_market = &m; + my_permit_manager = &m; my_limit = 1; // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). my_num_slots = num_arena_slots(num_slots); @@ -206,7 +206,7 @@ arena::arena (market& m, unsigned num_slots, unsigned num_reserved_slots, unsign #endif } -arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots, +arena& arena::allocate_arena( permit_manager& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level, unsigned epoch ) { __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); @@ -252,7 +252,7 @@ void arena::free_arena () { __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); #endif // remove an internal reference - my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); + my_permit_manager->release( /*is_public=*/false, /*blocking_terminate=*/false ); // Clear enfources synchronization with observe(false) my_observers.clear(); @@ -276,7 +276,7 @@ bool arena::is_out_of_work() { if (my_local_concurrency_flag.try_clear_if([this] { return !has_enqueued_tasks(); })) { - my_market->adjust_demand(*this->my_client, /* delta = */ -1, /* mandatory = */ true); + my_permit_manager->adjust_demand(*this->my_client, /* delta = */ -1, /* mandatory = */ true); } #endif @@ -327,7 +327,7 @@ bool arena::is_out_of_work() { if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) { // This thread transitioned pool to empty state, and thus is // responsible for telling the market that there is no work to do. - my_market->adjust_demand(*this->my_client, -current_demand, /* mandatory = */ false); + my_permit_manager->adjust_demand(*this->my_client, -current_demand, /* mandatory = */ false); return true; } return false; @@ -379,12 +379,12 @@ void assert_arena_priority_valid( tbb::task_arena::priority ) {} unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { assert_arena_priority_valid( a_priority ); - return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); + return permit_manager::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); } tbb::task_arena::priority arena_priority( unsigned priority_level ) { auto priority = tbb::task_arena::priority( - (market::num_priority_levels - priority_level) * d1::priority_stride + (permit_manager::num_priority_levels - priority_level) * d1::priority_stride ); assert_arena_priority_valid( priority ); return priority; @@ -445,10 +445,10 @@ void task_arena_impl::initialize(d1::task_arena_base& ta) { __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); unsigned priority_level = arena_priority_level(ta.my_priority); - arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0); + arena* a = permit_manager::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0); ta.my_arena.store(a, std::memory_order_release); // add an internal market reference; a public reference was added in create_arena - market::global_market( /*is_public=*/false); + permit_manager::global_market( /*is_public=*/false); #if __TBB_ARENA_BINDING a->my_numa_binding_observer = construct_binding_observer( static_cast(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); @@ -458,7 +458,7 @@ void task_arena_impl::initialize(d1::task_arena_base& ta) { void task_arena_impl::terminate(d1::task_arena_base& ta) { arena* a = ta.my_arena.load(std::memory_order_relaxed); assert_pointer_valid(a); - a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); + a->my_permit_manager->release( /*is_public=*/true, /*blocking_terminate=*/false ); a->on_thread_leaving(); ta.my_arena.store(nullptr, std::memory_order_relaxed); } @@ -478,7 +478,7 @@ bool task_arena_impl::attach(d1::task_arena_base& ta) { __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, nullptr); ta.my_arena.store(a, std::memory_order_release); // increases market's ref count for task_arena - market::global_market( /*is_public=*/true ); + permit_manager::global_market( /*is_public=*/true ); return true; } return false; @@ -520,7 +520,7 @@ class nested_arena_context : no_copy { // If the calling thread occupies the slots out of external thread reserve we need to notify the // market that this arena requires one worker less. if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { - td.my_arena->my_market->adjust_demand(*td.my_arena->my_client, /* delta = */ -1, /* mandatory = */ false); + td.my_arena->my_permit_manager->adjust_demand(*td.my_arena->my_client, /* delta = */ -1, /* mandatory = */ false); } td.my_last_observer = nullptr; @@ -556,7 +556,7 @@ class nested_arena_context : no_copy { // Notify the market that this thread releasing a one slot // that can be used by a worker thread. if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { - td.my_arena->my_market->adjust_demand(*td.my_arena->my_client, /* delta = */ 1, /* mandatory = */ false); + td.my_arena->my_permit_manager->adjust_demand(*td.my_arena->my_client, /* delta = */ 1, /* mandatory = */ false); } td.leave_task_dispatcher(); diff --git a/src/tbb/arena.h b/src/tbb/arena.h index 9c934ce28b0..5e74aca50f3 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -28,11 +28,11 @@ #include "arena_slot.h" #include "rml_tbb.h" #include "mailbox.h" -#include "market.h" #include "governor.h" #include "concurrent_monitor.h" #include "observer_proxy.h" #include "oneapi/tbb/spin_mutex.h" +#include "market_concurrent_monitor.h" #include "resource_manager.h" @@ -251,7 +251,7 @@ class arena_base : padded { // Below are rarely modified members //! The market that owns this arena. - market* my_market; + permit_manager* my_permit_manager; //! Default task group context. d1::task_group_context* my_default_ctx; @@ -318,10 +318,10 @@ class arena: public padded //! Constructor // arena (permit_manager& m, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch); - arena (market& m, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch); + arena (permit_manager& m, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch); //! Allocate an instance of arena. - static arena& allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots, + static arena& allocate_arena( permit_manager& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level, unsigned epoch ); static int unsigned num_arena_slots ( unsigned num_slots ) { @@ -545,7 +545,7 @@ inline void arena::on_thread_leaving ( ) { // std::uintptr_t aba_epoch = my_aba_epoch; unsigned priority_level = my_priority_level; - market* m = my_market; + permit_manager* m = my_permit_manager; __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter"); #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY // When there is no workers someone must free arena, as @@ -555,7 +555,7 @@ inline void arena::on_thread_leaving ( ) { // because it can create the demand of workers, // but the arena can be already empty (and so ready for destroying) // TODO: Fix the race: while we check soft limit and it might be changed. - if( ref_param==ref_external && my_num_slots != my_num_reserved_slots && m->is_global_concurrency_disabled(my_client)) { + if( ref_param==ref_external && my_num_slots != my_num_reserved_slots && my_num_workers_allotted.load(std::memory_order_relaxed) == 0) { is_out_of_work(); // We expect, that in worst case it's enough to have num_priority_levels-1 // calls to restore priorities and yet another is_out_of_work() to conform @@ -570,7 +570,7 @@ inline void arena::on_thread_leaving ( ) { if (remaining_ref == 0) { if (m->try_destroy_arena(my_client, aba_epoch, priority_level)) { // We are requested to destroy ourself - my_market->destroy_client(*my_client); + my_permit_manager->destroy_client(*my_client); free_arena(); } } @@ -585,10 +585,10 @@ void arena::advertise_new_work() { if( work_type == work_enqueued ) { atomic_fence_seq_cst(); #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY - my_market->enable_mandatory_concurrency(my_client); + my_permit_manager->enable_mandatory_concurrency(my_client); if (my_max_num_workers == 0 && my_num_reserved_slots == 1 && my_local_concurrency_flag.test_and_set()) { - my_market->adjust_demand(*this->my_client, /* delta = */ 1, /* mandatory = */ true); + my_permit_manager->adjust_demand(*this->my_client, /* delta = */ 1, /* mandatory = */ true); } #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ // Local memory fence here and below is required to avoid missed wakeups; see the comment below. @@ -626,14 +626,14 @@ void arena::advertise_new_work() { // telling the market that there is work to do. #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY if( work_type == work_spawned ) { - my_market->mandatory_concurrency_disable( my_client ); + my_permit_manager->mandatory_concurrency_disable( my_client ); } #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ // TODO: investigate adjusting of arena's demand by a single worker. - my_market->adjust_demand(*this->my_client, my_max_num_workers, /* mandatory = */ false); + my_permit_manager->adjust_demand(*this->my_client, my_max_num_workers, /* mandatory = */ false); // Notify all sleeping threads that work has appeared in the arena. - my_market->get_wait_list().notify(is_related_arena); + my_permit_manager->get_wait_list().notify(is_related_arena); } } } diff --git a/src/tbb/market.h b/src/tbb/market.h index 14d06e494c1..c722703df3f 100644 --- a/src/tbb/market.h +++ b/src/tbb/market.h @@ -231,7 +231,7 @@ class market : public permit_manager, rml::tbb_client { unsigned arena_index, std::size_t stack_size ); //! Removes the arena from the market's list - bool try_destroy_arena (permit_manager_client*, uintptr_t aba_epoch, unsigned priority_level ); + bool try_destroy_arena (permit_manager_client*, uintptr_t aba_epoch, unsigned priority_level ) override; //! Removes the arena from the market's list void detach_arena (tbb_permit_manager_client& ); diff --git a/src/tbb/resource_manager.h b/src/tbb/resource_manager.h index 558acd7c010..fbe9a732611 100644 --- a/src/tbb/resource_manager.h +++ b/src/tbb/resource_manager.h @@ -41,6 +41,10 @@ class permit_manager : no_copy { virtual void request_demand(unsigned min, unsigned max, permit_manager_client&) = 0; virtual void release_demand(permit_manager_client&) = 0; + + //! Removes the arena from the market's list + virtual bool try_destroy_arena (permit_manager_client*, uintptr_t aba_epoch, unsigned priority_level ) = 0; + };