Skip to content

Commit

Permalink
Refactoring #6
Browse files Browse the repository at this point in the history
Signed-off-by: Alexei Katranov <[email protected]>
  • Loading branch information
alexey-katranov authored and pavelkumbrasev committed Dec 2, 2022
1 parent a83b45b commit 0e0d4e2
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ void arena::advertise_new_work() {
my_permit_manager->adjust_demand(*this->my_client, my_max_num_workers, /* mandatory = */ false);

// Notify all sleeping threads that work has appeared in the arena.
my_permit_manager->get_wait_list().notify(is_related_arena);
governor::get_wait_list().notify(is_related_arena);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/tbb/governor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arena.h"
#include "dynamic_link.h"
#include "concurrent_monitor.h"
#include "market_concurrent_monitor.h"

#include "oneapi/tbb/task_group.h"
#include "oneapi/tbb/global_control.h"
Expand Down Expand Up @@ -68,6 +69,7 @@ void governor::acquire_resources () {
detect_cpu_features(cpu_features);

is_rethrow_broken = gcc_rethrow_exception_broken();
sleep_monitor = new (cache_aligned_allocate(sizeof(market_concurrent_monitor))) market_concurrent_monitor;
}

void governor::release_resources () {
Expand All @@ -79,6 +81,8 @@ void governor::release_resources () {
int status = theTLS.destroy();
if( status )
runtime_warning("failed to destroy task scheduler TLS: %s", std::strerror(status));
sleep_monitor->~market_concurrent_monitor();
cache_aligned_deallocate(sleep_monitor);
clear_address_waiter_table();

system_topology::destroy();
Expand Down
8 changes: 8 additions & 0 deletions src/tbb/governor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace r1 {
class market;
class thread_data;
class __TBB_InitOnce;
class market_concurrent_monitor;

#if __TBB_USE_ITT_NOTIFY
//! Defined in profiling.cpp
Expand Down Expand Up @@ -62,6 +63,8 @@ class governor {
static cpu_features_type cpu_features;
static bool is_rethrow_broken;

static market_concurrent_monitor* sleep_monitor;

//! Create key for thread-local storage and initialize RML.
static void acquire_resources ();

Expand Down Expand Up @@ -146,6 +149,11 @@ class governor {
return false;
#endif
}

//! Return wait list
static market_concurrent_monitor& get_wait_list() {
return *sleep_monitor;
}
}; // class governor

} // namespace r1
Expand Down
6 changes: 0 additions & 6 deletions src/tbb/market.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ class market : public permit_manager, rml::tbb_client {
//! Pointer to the RML server object that services this TBB instance.
rml::tbb_server* my_server;

//! Waiting object for external and coroutine waiters.
market_concurrent_monitor my_sleep_monitor;

//! Maximal number of workers allowed for use by the underlying resource manager
/** It can't be changed after market creation. **/
unsigned my_num_workers_hard_limit;
Expand Down Expand Up @@ -239,9 +236,6 @@ class market : public permit_manager, rml::tbb_client {
//! Decrements market's refcount and destroys it in the end
bool release ( bool is_public, bool blocking_terminate );

//! Return wait list
market_concurrent_monitor& get_wait_list() { return my_sleep_monitor; }

#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
//! Imlpementation of mandatory concurrency enabling
void enable_mandatory_concurrency_impl (tbb_permit_manager_client*a );
Expand Down
4 changes: 2 additions & 2 deletions src/tbb/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ void task_dispatcher::do_post_resume_action() {
auto is_our_suspend_point = [sp] (market_context ctx) {
return std::uintptr_t(sp) == ctx.my_uniq_addr;
};
td->my_arena->my_market->get_wait_list().notify(is_our_suspend_point);
governor::get_wait_list().notify(is_our_suspend_point);
break;
}
default:
Expand Down Expand Up @@ -218,7 +218,7 @@ void notify_waiters(std::uintptr_t wait_ctx_addr) {
return wait_ctx_addr == context.my_uniq_addr;
};

r1::governor::get_thread_data()->my_arena->my_market->get_wait_list().notify(is_related_wait_ctx);
governor::get_wait_list().notify(is_related_wait_ctx);
}

} // namespace r1
Expand Down
2 changes: 1 addition & 1 deletion src/tbb/task_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ inline d1::task* suspend_point_type::resume_task::execute(d1::execution_data& ed
thread_data* td = ed_ext.task_disp->m_thread_data;
td->set_post_resume_action(task_dispatcher::post_resume_action::register_waiter, &monitor_node);

market_concurrent_monitor& wait_list = td->my_arena->my_market->get_wait_list();
market_concurrent_monitor& wait_list = governor::get_wait_list();

if (wait_list.wait([&] { return !ed_ext.wait_ctx->continue_execution(); }, monitor_node)) {
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions src/tbb/thread_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class task;
class arena_slot;
class task_group_context;
class task_dispatcher;
class tbb_permit_manager_client;

class context_list : public intrusive_list<intrusive_list_node> {
public:
Expand Down

0 comments on commit 0e0d4e2

Please sign in to comment.