Skip to content

Commit

Permalink
Refactoring #5
Browse files Browse the repository at this point in the history
Signed-off-by: pavelkumbrasev <[email protected]>
  • Loading branch information
pavelkumbrasev committed Nov 24, 2022
1 parent a5bd41c commit 72ed179
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 28 deletions.
30 changes: 15 additions & 15 deletions src/tbb/arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ std::size_t arena::occupy_free_slot(thread_data& tls) {

std::uintptr_t arena::calculate_stealing_threshold() {
stack_anchor_type anchor;
return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_permit_manager->worker_stack_size());
}

void arena::process(thread_data& tls) {
Expand Down Expand Up @@ -164,12 +164,12 @@ void arena::process(thread_data& tls) {
}

// arena::arena (permit_manager& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch)
arena::arena (market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch)
arena::arena (permit_manager& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch)
{
__TBB_ASSERT( !my_guard, "improperly allocated arena?" );
__TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
__TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
my_market = &m;
my_permit_manager = &m;
my_limit = 1;
// Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
my_num_slots = num_arena_slots(num_slots);
Expand Down Expand Up @@ -206,7 +206,7 @@ arena::arena (market& m, unsigned num_slots, unsigned num_reserved_slots, unsign
#endif
}

arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
arena& arena::allocate_arena( permit_manager& m, unsigned num_slots, unsigned num_reserved_slots,
unsigned priority_level, unsigned epoch )
{
__TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
Expand Down Expand Up @@ -252,7 +252,7 @@ void arena::free_arena () {
__TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
#endif
// remove an internal reference
my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
my_permit_manager->release( /*is_public=*/false, /*blocking_terminate=*/false );

// Clear enfources synchronization with observe(false)
my_observers.clear();
Expand All @@ -276,7 +276,7 @@ bool arena::is_out_of_work() {
if (my_local_concurrency_flag.try_clear_if([this] {
return !has_enqueued_tasks();
})) {
my_market->adjust_demand(*this->my_client, /* delta = */ -1, /* mandatory = */ true);
my_permit_manager->adjust_demand(*this->my_client, /* delta = */ -1, /* mandatory = */ true);
}
#endif

Expand Down Expand Up @@ -327,7 +327,7 @@ bool arena::is_out_of_work() {
if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) {
// This thread transitioned pool to empty state, and thus is
// responsible for telling the market that there is no work to do.
my_market->adjust_demand(*this->my_client, -current_demand, /* mandatory = */ false);
my_permit_manager->adjust_demand(*this->my_client, -current_demand, /* mandatory = */ false);
return true;
}
return false;
Expand Down Expand Up @@ -379,12 +379,12 @@ void assert_arena_priority_valid( tbb::task_arena::priority ) {}

unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
assert_arena_priority_valid( a_priority );
return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
return permit_manager::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
}

tbb::task_arena::priority arena_priority( unsigned priority_level ) {
auto priority = tbb::task_arena::priority(
(market::num_priority_levels - priority_level) * d1::priority_stride
(permit_manager::num_priority_levels - priority_level) * d1::priority_stride
);
assert_arena_priority_valid( priority );
return priority;
Expand Down Expand Up @@ -451,10 +451,10 @@ void task_arena_impl::initialize(d1::task_arena_base& ta) {

__TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
unsigned priority_level = arena_priority_level(ta.my_priority);
arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
arena* a = permit_manager::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
ta.my_arena.store(a, std::memory_order_release);
// add an internal market reference; a public reference was added in create_arena
market::global_market( /*is_public=*/false);
permit_manager::global_market( /*is_public=*/false);
#if __TBB_ARENA_BINDING
a->my_numa_binding_observer = construct_binding_observer(
static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
Expand All @@ -464,7 +464,7 @@ void task_arena_impl::initialize(d1::task_arena_base& ta) {
void task_arena_impl::terminate(d1::task_arena_base& ta) {
arena* a = ta.my_arena.load(std::memory_order_relaxed);
assert_pointer_valid(a);
a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
a->my_permit_manager->release( /*is_public=*/true, /*blocking_terminate=*/false );
a->on_thread_leaving<arena::ref_external>();
ta.my_arena.store(nullptr, std::memory_order_relaxed);
}
Expand All @@ -484,7 +484,7 @@ bool task_arena_impl::attach(d1::task_arena_base& ta) {
__TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, nullptr);
ta.my_arena.store(a, std::memory_order_release);
// increases market's ref count for task_arena
market::global_market( /*is_public=*/true );
permit_manager::global_market( /*is_public=*/true );
return true;
}
return false;
Expand Down Expand Up @@ -526,7 +526,7 @@ class nested_arena_context : no_copy {
// If the calling thread occupies the slots out of external thread reserve we need to notify the
// market that this arena requires one worker less.
if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
td.my_arena->my_market->adjust_demand(*td.my_arena->my_client, /* delta = */ -1, /* mandatory = */ false);
td.my_arena->my_permit_manager->adjust_demand(*td.my_arena->my_client, /* delta = */ -1, /* mandatory = */ false);
}

td.my_last_observer = nullptr;
Expand Down Expand Up @@ -562,7 +562,7 @@ class nested_arena_context : no_copy {
// Notify the market that this thread releasing a one slot
// that can be used by a worker thread.
if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
td.my_arena->my_market->adjust_demand(*td.my_arena->my_client, /* delta = */ 1, /* mandatory = */ false);
td.my_arena->my_permit_manager->adjust_demand(*td.my_arena->my_client, /* delta = */ 1, /* mandatory = */ false);
}

td.leave_task_dispatcher();
Expand Down
24 changes: 12 additions & 12 deletions src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
#include "arena_slot.h"
#include "rml_tbb.h"
#include "mailbox.h"
#include "market.h"
#include "governor.h"
#include "concurrent_monitor.h"
#include "observer_proxy.h"
#include "oneapi/tbb/spin_mutex.h"
#include "market_concurrent_monitor.h"

#include "resource_manager.h"

Expand Down Expand Up @@ -251,7 +251,7 @@ class arena_base : padded<intrusive_list_node> {
// Below are rarely modified members

//! The market that owns this arena.
market* my_market;
permit_manager* my_permit_manager;

//! Default task group context.
d1::task_group_context* my_default_ctx;
Expand Down Expand Up @@ -318,10 +318,10 @@ class arena: public padded<arena_base>

//! Constructor
// arena (permit_manager& m, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch);
arena (market& m, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch);
arena (permit_manager& m, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level, uintptr_t aba_epoch);

//! Allocate an instance of arena.
static arena& allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
static arena& allocate_arena( permit_manager& m, unsigned num_slots, unsigned num_reserved_slots,
unsigned priority_level, unsigned epoch );

static int unsigned num_arena_slots ( unsigned num_slots ) {
Expand Down Expand Up @@ -545,7 +545,7 @@ inline void arena::on_thread_leaving ( ) {
//
std::uintptr_t aba_epoch = my_aba_epoch;
unsigned priority_level = my_priority_level;
market* m = my_market;
permit_manager* m = my_permit_manager;
__TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
// When there is no workers someone must free arena, as
Expand All @@ -555,7 +555,7 @@ inline void arena::on_thread_leaving ( ) {
// because it can create the demand of workers,
// but the arena can be already empty (and so ready for destroying)
// TODO: Fix the race: while we check soft limit and it might be changed.
if( ref_param==ref_external && my_num_slots != my_num_reserved_slots && m->is_global_concurrency_disabled(my_client)) {
if( ref_param==ref_external && my_num_slots != my_num_reserved_slots && my_num_workers_allotted.load(std::memory_order_relaxed) == 0) {
is_out_of_work();
// We expect, that in worst case it's enough to have num_priority_levels-1
// calls to restore priorities and yet another is_out_of_work() to conform
Expand All @@ -570,7 +570,7 @@ inline void arena::on_thread_leaving ( ) {
if (remaining_ref == 0) {
if (m->try_destroy_arena(my_client, aba_epoch, priority_level)) {
// We are requested to destroy ourself
my_market->destroy_client(*my_client);
my_permit_manager->destroy_client(*my_client);
free_arena();
}
}
Expand All @@ -585,10 +585,10 @@ void arena::advertise_new_work() {
if( work_type == work_enqueued ) {
atomic_fence_seq_cst();
#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
my_market->enable_mandatory_concurrency(my_client);
my_permit_manager->enable_mandatory_concurrency(my_client);

if (my_max_num_workers == 0 && my_num_reserved_slots == 1 && my_local_concurrency_flag.test_and_set()) {
my_market->adjust_demand(*this->my_client, /* delta = */ 1, /* mandatory = */ true);
my_permit_manager->adjust_demand(*this->my_client, /* delta = */ 1, /* mandatory = */ true);
}
#endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
// Local memory fence here and below is required to avoid missed wakeups; see the comment below.
Expand Down Expand Up @@ -626,14 +626,14 @@ void arena::advertise_new_work() {
// telling the market that there is work to do.
#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
if( work_type == work_spawned ) {
my_market->mandatory_concurrency_disable( my_client );
my_permit_manager->mandatory_concurrency_disable( my_client );
}
#endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
// TODO: investigate adjusting of arena's demand by a single worker.
my_market->adjust_demand(*this->my_client, my_max_num_workers, /* mandatory = */ false);
my_permit_manager->adjust_demand(*this->my_client, my_max_num_workers, /* mandatory = */ false);

// Notify all sleeping threads that work has appeared in the arena.
my_market->get_wait_list().notify(is_related_arena);
my_permit_manager->get_wait_list().notify(is_related_arena);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/tbb/market.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class market : public permit_manager, rml::tbb_client {
unsigned arena_index, std::size_t stack_size );

//! Removes the arena from the market's list
bool try_destroy_arena (permit_manager_client*, uintptr_t aba_epoch, unsigned priority_level );
bool try_destroy_arena (permit_manager_client*, uintptr_t aba_epoch, unsigned priority_level ) override;

//! Removes the arena from the market's list
void detach_arena (tbb_permit_manager_client& );
Expand Down
4 changes: 4 additions & 0 deletions src/tbb/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class permit_manager : no_copy {

virtual void request_demand(unsigned min, unsigned max, permit_manager_client&) = 0;
virtual void release_demand(permit_manager_client&) = 0;

//! Removes the arena from the market's list
virtual bool try_destroy_arena (permit_manager_client*, uintptr_t aba_epoch, unsigned priority_level ) = 0;

};


Expand Down

0 comments on commit 72ed179

Please sign in to comment.