Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix tbb initialize #2791

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/daal/include/services/env_detect.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ class DAAL_EXPORT Environment : public Base
void initNumberOfThreads();

env _env;
// Pointer to the oneapi::tbb::task_scheduler_handle class object, global for oneDAL.
// The oneapi::tbb::task_scheduler_handle and the oneapi::tbb::finalize function
// allow user to wait for completion of worker threads.
void * _schedulerHandle;
void * _globalControl;
SharedPtr<services::internal::sycl::ExecutionContextIface> _executionContext;
};
Expand Down
32 changes: 28 additions & 4 deletions cpp/daal/src/externals/core_threading_win_dll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ typedef void (*_daal_wait_task_group_t)(void * taskGroupPtr);

typedef bool (*_daal_is_in_parallel_t)();
typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl);
typedef void (*_daal_tbb_task_scheduler_handle_free_t)(void *& schedulerHandle);
typedef size_t (*_setNumberOfThreads_t)(const size_t, void **);
typedef size_t (*_setSchedulerHandle_t)(void **);
typedef void * (*_daal_threader_env_t)();

typedef void (*_daal_parallel_sort_int32_t)(int *, int *);
Expand Down Expand Up @@ -205,10 +207,12 @@ static _daal_del_task_group_t _daal_del_task_group_ptr = NULL;
static _daal_run_task_group_t _daal_run_task_group_ptr = NULL;
static _daal_wait_task_group_t _daal_wait_task_group_ptr = NULL;

static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL;
static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL;
static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL;
static _daal_threader_env_t _daal_threader_env_ptr = NULL;
static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL;
static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL;
static _daal_tbb_task_scheduler_handle_free_t _daal_tbb_task_scheduler_handle_free_ptr = NULL;
static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL;
static _setSchedulerHandle_t _setSchedulerHandle_ptr = NULL;
static _daal_threader_env_t _daal_threader_env_ptr = NULL;

static _daal_parallel_sort_int32_t _daal_parallel_sort_int32_ptr = NULL;
static _daal_parallel_sort_uint64_t _daal_parallel_sort_uint64_ptr = NULL;
Expand Down Expand Up @@ -657,6 +661,16 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init)
return _daal_tbb_task_scheduler_free_ptr(init);
}

DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& init)
{
load_daal_thr_dll();
if (_daal_tbb_task_scheduler_handle_free_ptr == NULL)
{
_daal_tbb_task_scheduler_handle_free_ptr = (_daal_tbb_task_scheduler_handle_free_t)load_daal_thr_func("_daal_tbb_task_scheduler_handle_free");
}
return _daal_tbb_task_scheduler_handle_free_ptr(init);
}

DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** init)
{
load_daal_thr_dll();
Expand All @@ -667,6 +681,16 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** init)
return _setNumberOfThreads_ptr(numThreads, init);
}

DAAL_EXPORT size_t _setSchedulerHandle(void ** init)
{
load_daal_thr_dll();
if (_setSchedulerHandle_ptr == NULL)
{
_setSchedulerHandle_ptr = (_setSchedulerHandle_t)load_daal_thr_func("_setSchedulerHandle");
}
return _setSchedulerHandle_ptr(init);
}

DAAL_EXPORT void * _daal_threader_env()
{
load_daal_thr_dll();
Expand Down
15 changes: 12 additions & 3 deletions cpp/daal/src/services/env_detect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi
initNumberOfThreads();
}

DAAL_EXPORT daal::services::Environment::Environment() : _globalControl {}
DAAL_EXPORT daal::services::Environment::Environment() : _schedulerHandle {}, _globalControl {}
{
_env.cpuid_init_flag = false;
_env.cpuid = -1;
Expand All @@ -137,7 +137,14 @@ DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : da
DAAL_EXPORT void daal::services::Environment::initNumberOfThreads()
{
if (isInit) return;

// Initializes global oneapi::tbb::task_scheduler_handle object in oneDAL to prevent the unexpected
// destruction of the calling thread.
// When the oneapi::tbb::finalize function is called with an oneapi::tbb::task_scheduler_handle
// instance, it blocks the calling thread until the completion of all worker
// threads that were implicitly created by the library.
#if defined(TARGET_X86_64)
daal::setSchedulerHandle(&_schedulerHandle);
#endif
/* if HT enabled - set _numThreads to physical cores num */
if (daal::internal::ServiceInst::serv_get_ht())
{
Expand All @@ -156,7 +163,6 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads()
DAAL_EXPORT daal::services::Environment::~Environment()
{
daal::services::daal_free_buffers();
_daal_tbb_task_scheduler_free(_globalControl);
}

void daal::services::Environment::_cpu_detect(int enable)
Expand All @@ -171,6 +177,9 @@ void daal::services::Environment::_cpu_detect(int enable)
DAAL_EXPORT void daal::services::Environment::setNumberOfThreads(const size_t numThreads)
{
isInit = true;
#if defined(TARGET_X86_64)
daal::setSchedulerHandle(&_schedulerHandle);
#endif
daal::setNumberOfThreads(numThreads, &_globalControl);
}

Expand Down
3 changes: 3 additions & 0 deletions cpp/daal/src/threading/service_thread_pinner.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ class thread_pinner_impl_t : public tbb::task_scheduler_observer
thread_pinner_impl_t::thread_pinner_impl_t(void (*read_topo)(int &, int &, int &, int **), void (*deleter)(void *))
: pinner_arena(nthreads = daal::threader_get_threads_number()), tbb::task_scheduler_observer(pinner_arena), topo_deleter(deleter)
{
#if defined(TARGET_X86_64)
pinner_arena.initialize();
#endif
do_pinning = (nthreads > 0) ? true : false;
is_pinning.set(0);

Expand Down
25 changes: 25 additions & 0 deletions cpp/daal/src/threading/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl)
#endif
}

DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle)
{
#if defined(TARGET_X86_64)
#if defined(__DO_TBB_LAYER__)
if (schedulerHandle)
{
delete reinterpret_cast<tbb::task_scheduler_handle *>(schedulerHandle);
schedulerHandle = nullptr;
}
#endif
#endif
}

DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalControl)
{
#if defined(__DO_TBB_LAYER__)
Expand All @@ -92,6 +105,18 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalCo
return 1;
}

DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle)
{
#if defined(TARGET_X86_64)
#if defined(__DO_TBB_LAYER__)
*schedulerHandle = reinterpret_cast<void *>(new tbb::task_scheduler_handle(tbb::attach {}));
// It is necessary for initializing tbb in cases where DAAL does not use it.
tbb::task_arena {}.initialize();
#endif
#endif
return 0;
}

DAAL_EXPORT void _daal_threader_for(int n, int threads_request, const void * a, daal::functype func)
{
#if defined(__DO_TBB_LAYER__)
Expand Down
7 changes: 7 additions & 0 deletions cpp/daal/src/threading/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ extern "C"
DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr);

DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl);
DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle);
DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalControl);
DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle);

DAAL_EXPORT void * _daal_threader_env();

Expand Down Expand Up @@ -183,6 +185,11 @@ inline size_t threader_get_threads_number()
return threader_env()->getNumberOfThreads();
}

inline size_t setSchedulerHandle(void ** schedulerHandle)
{
return _setSchedulerHandle(schedulerHandle);
}

inline size_t setNumberOfThreads(const size_t numThreads, void ** globalControl)
{
return _setNumberOfThreads(numThreads, globalControl);
Expand Down
Loading