From 69a50ada1eed0d6711a379e6f7db4547395a1056 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Wed, 6 May 2020 09:46:59 +0200 Subject: [PATCH 01/20] replace TPoolManager by RTaskArenaWrapper Replace all uses of TPoolManager, based on the deprecated tbb_task_scheduler_init and its implicit task_arena, by explicit manipulation of our own central instance of task_arena, as suggested by intel: https://software.intel.com/sites/default/files/managed/b2/d2/TBBRevamp.pdf Work out some gymnastics to keep tbb out of the headers, encapsulating it within the new RTaskArenaWrapper class. Couldn't find a solution for the forward-declaration tbb::task_arena, which is an alias of a versioned namespace dependent class that we are forced to forward declare instead (tbb::interface7::tbb_task_arena). --- core/imt/CMakeLists.txt | 4 +- core/imt/inc/ROOT/RTaskArena.hxx | 87 +++++++++++++++++ core/imt/inc/ROOT/TPoolManager.hxx | 77 --------------- core/imt/inc/ROOT/TThreadExecutor.hxx | 6 +- core/imt/src/RTaskArena.cxx | 129 ++++++++++++++++++++++++++ core/imt/src/TImplicitMT.cxx | 18 ++-- core/imt/src/TThreadExecutor.cxx | 31 +++---- 7 files changed, 241 insertions(+), 111 deletions(-) create mode 100644 core/imt/inc/ROOT/RTaskArena.hxx delete mode 100644 core/imt/inc/ROOT/TPoolManager.hxx create mode 100644 core/imt/src/RTaskArena.cxx diff --git a/core/imt/CMakeLists.txt b/core/imt/CMakeLists.txt index 495bb41180a78..59634b3721320 100644 --- a/core/imt/CMakeLists.txt +++ b/core/imt/CMakeLists.txt @@ -25,8 +25,8 @@ target_link_libraries(Imt PRIVATE Thread INTERFACE Core) if(imt) ROOT_GENERATE_DICTIONARY(G__Imt STAGE1 ROOT/TFuture.hxx - ROOT/TPoolManager.hxx ROOT/TTaskGroup.hxx + ROOT/RTaskArena.hxx ROOT/TThreadExecutor.hxx LINKDEF LinkDef.h @@ -41,8 +41,8 @@ if(imt) # G__Imt.cxx is automatically added by ROOT_GENERATE_DICTIONARY() target_sources(Imt PRIVATE + src/RTaskArena.cxx src/TImplicitMT.cxx - src/TPoolManager.cxx src/TThreadExecutor.cxx ) diff --git a/core/imt/inc/ROOT/RTaskArena.hxx b/core/imt/inc/ROOT/RTaskArena.hxx new file mode 100644 index 0000000000000..60b733c4fdc4f --- /dev/null +++ b/core/imt/inc/ROOT/RTaskArena.hxx @@ -0,0 +1,87 @@ +// @(#)root/thread:$Id$ +// // Author: Xavier Valls Pla 08/05/20 +// +/************************************************************************* + * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. * + * All rights reserved. * + * * + * For the licensing terms see $ROOTSYS/LICENSE. * + * For the list of contributors see $ROOTSYS/README/CREDITS. * + *************************************************************************/ + +////////////////////////////////////////////////////////////////////////// +// // +// RTaskArena // +// // +// This file implements the method to initialize and retrieve ROOT's // +// global task arena, together with a method to check for active // +// CPU bandwith control, and a class to wrap the tbb task arena with // +// the purpose of keeping tbb off the installed headers // +// // +////////////////////////////////////////////////////////////////////////// + +#ifndef ROOT_RTaskArena +#define ROOT_RTaskArena + +#include "RConfigure.h" +#include + +// exclude in case ROOT does not have IMT support +#ifndef R__USE_IMT +// No need to error out for dictionaries. +# if !defined(__ROOTCLING__) && !defined(G__DICTIONARY) +# error "Cannot use ROOT::Internal::RTaskArenaWrapper without defining R__USE_IMT." +# endif +#else + +/// tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow +/// to forward declare tbb::task_arena without forward declaring tbb::interface7 +namespace tbb{ +namespace interface7{class task_arena;} +using task_arena = interface7::task_arena; +} + +namespace ROOT { +namespace Internal { + +//////////////////////////////////////////////////////////////////////////////// +/// Wrapper for tbb::task_arena. +/// +/// Necessary in order to keep tbb away from ROOT headers +//////////////////////////////////////////////////////////////////////////////// +class RTaskArenaWrapper { +public: + RTaskArenaWrapper(); + unsigned TaskArenaSize(); + std::unique_ptr &Access(); +private: + std::unique_ptr fTBBArena; +}; + + +//////////////////////////////////////////////////////////////////////////////// +// Factory function returning a shared pointer to the instance of the global +// RTaskArenaWrapper. +// +// Allows for reinstantiation of the global RTaskArenaWrapper once all the +// references to the previous one are gone and the object destroyed. +//////////////////////////////////////////////////////////////////////////////// +std::shared_ptr GetGlobalTaskArena(); + +//////////////////////////////////////////////////////////////////////////////// +/// Initializes the global instance of tbb::task_arena and returns a shared_ptr to +/// its singleton wrapper +/// +/// * Always initializes with the available number of threads +/// * Can't be reinitialized +/// * Checks for CPU bandwidth control +/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads, +/// which is CPU affinity aware +//////////////////////////////////////////////////////////////////////////////// +std::shared_ptr InitGlobalTaskArena(unsigned maxConcurrency); + +} // namespace Internal +} // namespace ROOT + +#endif // R__USE_IMT +#endif // ROOT_RTaskArena diff --git a/core/imt/inc/ROOT/TPoolManager.hxx b/core/imt/inc/ROOT/TPoolManager.hxx deleted file mode 100644 index ee3f934d5e329..0000000000000 --- a/core/imt/inc/ROOT/TPoolManager.hxx +++ /dev/null @@ -1,77 +0,0 @@ -// @(#)root/thread:$Id$ -// Author: Xavier Valls January 2017 - -/************************************************************************* - * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. * - * All rights reserved. * - * * - * For the licensing terms see $ROOTSYS/LICENSE. * - * For the list of contributors see $ROOTSYS/README/CREDITS. * - *************************************************************************/ - -////////////////////////////////////////////////////////////////////////// -// // -// TPoolManager // -// // -////////////////////////////////////////////////////////////////////////// - - -#ifndef ROOT_TPoolManager -#define ROOT_TPoolManager - -#include "RConfigure.h" -#include "Rtypes.h" - -// exclude in case ROOT does not have IMT support -#ifndef R__USE_IMT -// No need to error out for dictionaries. -# if !defined(__ROOTCLING__) && !defined(G__DICTIONARY) -# error "Cannot use ROOT::TPoolManager without defining R__USE_IMT." -# endif -#else - -#include - -namespace tbb { - class task_scheduler_init; -} - -namespace ROOT { - namespace Internal { - /** - \class ROOT::TPoolManager - \ingroup TPoolManager - \brief A manager for the scheduler behind ROOT multithreading operations. - - A manager for the multithreading scheduler that solves undefined behaviours and interferences between - classes and functions that made direct use of the scheduler, such as EnableImplicitMT() ot TThreadExecutor. - */ - - class TPoolManager { - public: - friend std::shared_ptr GetPoolManager(UInt_t nThreads); - /// Returns the number of threads running when the scheduler has been instantiated within ROOT. - static UInt_t GetPoolSize(); - /// Terminates the scheduler instantiated within ROOT. - ~TPoolManager(); - private: - ///Initializes the scheduler within ROOT. If the scheduler has already been initialized by the - /// user before invoking the constructor it won't change its behaviour and it won't terminate it, - /// but it will still keep record of the number of threads passed as a parameter. - TPoolManager(UInt_t nThreads = 0); - static UInt_t fgPoolSize; - bool mustDelete = true; - tbb::task_scheduler_init *fSched = nullptr; - }; - /// Get a shared pointer to the manager. Initialize the manager with nThreads if not active. If active, - /// the number of threads, even if specified otherwise, will remain the same. - /// - /// The number of threads will be able to change calling the factory function again after the last - /// remaining shared_ptr owning the object is destroyed or reasigned, which will trigger the destructor of the manager. - std::shared_ptr GetPoolManager(UInt_t nThreads = 0); - } -} - -#endif // R__USE_IMT - -#endif diff --git a/core/imt/inc/ROOT/TThreadExecutor.hxx b/core/imt/inc/ROOT/TThreadExecutor.hxx index d07a0e84f7094..979ba14f6edba 100644 --- a/core/imt/inc/ROOT/TThreadExecutor.hxx +++ b/core/imt/inc/ROOT/TThreadExecutor.hxx @@ -23,13 +23,13 @@ #else #include "ROOT/TExecutor.hxx" -#include "ROOT/TPoolManager.hxx" -#include "TROOT.h" +#include "RTaskArena.hxx" #include "TError.h" #include #include #include + namespace ROOT { class TThreadExecutor: public TExecutor { @@ -104,7 +104,7 @@ namespace ROOT { template auto SeqReduce(const std::vector &objs, R redfunc) -> decltype(redfunc(objs)); - std::shared_ptr fSched = nullptr; + std::shared_ptr fTaskArenaW = nullptr; }; /************ TEMPLATE METHODS IMPLEMENTATION ******************/ diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx new file mode 100644 index 0000000000000..9f4d93e6ca776 --- /dev/null +++ b/core/imt/src/RTaskArena.cxx @@ -0,0 +1,129 @@ +#include "ROOT/RTaskArena.hxx" +#include "TError.h" +#include "TROOT.h" +#include "TThread.h" +#include +#include +#include +#include "tbb/task_arena.h" + + +////////////////////////////////////////////////////////////////////////// +/// +/// \class ROOT::Internal::RTaskArenaWrapper +/// \ingroup Parallelism +/// \brief Wrapper over tbb::task_arena +/// +/// This class is a wrapper over tbb::task_arena, in order to keep +/// TBB away from ROOT's headers. We keep a single global instance, +/// obtained with `ROOT::Internal::GetGlobalTaskArena()`, to be used by any +/// parallel ROOT class with TBB as a backend. This has several advantages: +/// +/// - Provides a unique interface to the TBB scheduler: TThreadExecutor, +/// IMT and any class relying on TBB will get a pointer to the scheduler +/// through `ROOT::Internal::GetGlobalTaskArena()`, which will return a +/// reference to the only pointer to the TBB scheduler that will be +/// active in any ROOT Process +/// - Solves multiple undefined behaviors. Guaranteeing that all classes +/// use the same task arena avoids interferences and undefined behavior +/// by providing a single instance of the tbb::task_arena and automated +/// bookkeeping, instantiation and destruction. +/// +/// #### Examples: +/// ~~~{.cpp} +/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena() //get a shared_ptr to the global arena +/// root[] gTA->InitGlobalTaskArena(nWorkers) // Initialize the global arena and enable Thread Safety in ROOT +/// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads) +/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to call operations such as execute) +/// root[] root[] gTA->Access()->max_concurrency() // call to tbb::task_arena::max_concurrency() +/// ~~~ +/// +////////////////////////////////////////////////////////////////////////// + + +//////////////////////////////////////////////////////////////////////////////// +/// Returns the available number of logical cores. +/// +/// - Checks if there is CFS bandwidth control in place (linux, via cgroups, +/// assuming standard paths) +/// - Otherwise, returns the number of logical cores provided by +/// std::thread::hardware_concurrency() +//////////////////////////////////////////////////////////////////////////////// +static Int_t LogicalCPUBandwithControl() +{ +#ifdef R__LINUX + // Check for CFS bandwith control + std::ifstream f; + std::string quotaFile("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); + struct stat buffer; + // Does the file exist? + if(stat(quotaFile.c_str(), &buffer) == 0) { + f.open(quotaFile); + float cfs_quota; + f>>cfs_quota; + f.close(); + if(cfs_quota > 0) { + std::string periodFile("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); + f.open(periodFile); + float cfs_period; + f>>cfs_period; + f.close(); + return static_cast(std::ceil(cfs_quota/cfs_period)); + } + } +#endif + return std::thread::hardware_concurrency(); +} + +namespace ROOT{ +namespace Internal { + +RTaskArenaWrapper::RTaskArenaWrapper(): fTBBArena(new tbb::task_arena{}){} + +unsigned RTaskArenaWrapper::TaskArenaSize() +{ + return fTBBArena->is_active()? static_cast(fTBBArena->max_concurrency()) : 0u; +} + +std::unique_ptr &RTaskArenaWrapper::Access() +{ + return fTBBArena; +} + + +std::shared_ptr GetGlobalTaskArena() +{ + static std::weak_ptr weak_GTAWrapper; + if (weak_GTAWrapper.expired()) { + std::shared_ptr shared_GTAWrapper(new ROOT::Internal::RTaskArenaWrapper()); + weak_GTAWrapper = shared_GTAWrapper; + return weak_GTAWrapper.lock(); + } + + return weak_GTAWrapper.lock(); +} + +std::shared_ptr InitGlobalTaskArena(unsigned maxConcurrency) +{ + auto globalTBBTaskArena = GetGlobalTaskArena(); + if (!globalTBBTaskArena->Access()->is_active()) { + unsigned tbbDefaultNumberThreads = globalTBBTaskArena->Access()->max_concurrency(); // not initialized, automatic state + maxConcurrency = maxConcurrency > 1 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads; + unsigned bcCpus = LogicalCPUBandwithControl(); + auto taskArenaSize = std::min({maxConcurrency, bcCpus}); + globalTBBTaskArena->Access()->initialize(taskArenaSize); + ROOT::EnableThreadSafety(); + } else { + unsigned current = globalTBBTaskArena->Access()->max_concurrency(); + if (maxConcurrency && (current != maxConcurrency)) { + Warning("InitGlobalTaskArena", "There's already an active task arena. Proceeding with the current %d threads", + current); + } + } + + return globalTBBTaskArena; +} + + +} // namespace Internal +} // namespace ROOT diff --git a/core/imt/src/TImplicitMT.cxx b/core/imt/src/TImplicitMT.cxx index 338080babdad5..4c7d54f3f09bd 100644 --- a/core/imt/src/TImplicitMT.cxx +++ b/core/imt/src/TImplicitMT.cxx @@ -19,19 +19,18 @@ ////////////////////////////////////////////////////////////////////////// #include "TError.h" -#include "TThread.h" -#include "ROOT/TPoolManager.hxx" +#include "ROOT/RTaskArena.hxx" #include -static std::shared_ptr &R__GetPoolManagerMT() +static std::shared_ptr &R__GetTaskArena4IMT() { - static std::shared_ptr schedMT; - return schedMT; + static std::shared_ptr globalTaskArena; + return globalTaskArena; } extern "C" UInt_t ROOT_MT_GetThreadPoolSize() { - return ROOT::Internal::TPoolManager::GetPoolSize(); + return ROOT::Internal::GetGlobalTaskArena()->TaskArenaSize(); }; static bool &GetImplicitMTFlag() @@ -49,10 +48,7 @@ static std::atomic_int &GetParBranchProcessingCount() extern "C" void ROOT_TImplicitMT_EnableImplicitMT(UInt_t numthreads) { if (!GetImplicitMTFlag()) { - if (ROOT::Internal::TPoolManager::GetPoolSize() == 0) { - TThread::Initialize(); - } - R__GetPoolManagerMT() = ROOT::Internal::GetPoolManager(numthreads); + R__GetTaskArena4IMT() = ROOT::Internal::InitGlobalTaskArena(numthreads); GetImplicitMTFlag() = true; } else { ::Warning("ROOT_TImplicitMT_EnableImplicitMT", "Implicit multi-threading is already enabled"); @@ -63,7 +59,7 @@ extern "C" void ROOT_TImplicitMT_DisableImplicitMT() { if (GetImplicitMTFlag()) { GetImplicitMTFlag() = false; - R__GetPoolManagerMT().reset(); + R__GetTaskArena4IMT().reset(); } else { ::Warning("ROOT_TImplicitMT_DisableImplicitMT", "Implicit multi-threading is already disabled"); } diff --git a/core/imt/src/TThreadExecutor.cxx b/core/imt/src/TThreadExecutor.cxx index c2e3e0993773d..f647fc2c294f6 100644 --- a/core/imt/src/TThreadExecutor.cxx +++ b/core/imt/src/TThreadExecutor.cxx @@ -1,13 +1,9 @@ #include "ROOT/TThreadExecutor.hxx" -#include "ROOT/TTaskGroup.hxx" - #if !defined(_MSC_VER) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wshadow" #endif - #include "tbb/tbb.h" - #if !defined(_MSC_VER) #pragma GCC diagnostic pop #endif @@ -135,41 +131,40 @@ namespace ROOT { /// If the scheduler is active (e.g. because another TThreadExecutor is in flight, or ROOT::EnableImplicitMT() was /// called), work with the current pool of threads. /// If not, initialize the pool of threads, spawning nThreads. nThreads' default value, 0, initializes the - /// pool with as many logical threads as are available in the system (see NLogicalCores in TPoolManager.cxx). + /// pool with as many logical threads as are available in the system (see NLogicalCores in RTaskArenaWrapper.cxx). /// /// At construction time, TThreadExecutor automatically enables ROOT's thread-safety locks as per calling /// ROOT::EnableThreadSafety(). TThreadExecutor::TThreadExecutor(UInt_t nThreads) { - ROOT::EnableThreadSafety(); - - auto current = ROOT::Internal::TPoolManager::GetPoolSize(); - if (nThreads && current && (current != nThreads)) - { - Warning("TThreadExecutor", "There's already an active pool of threads. Proceeding with the current %d threads", current); - } - fSched = ROOT::Internal::GetPoolManager(nThreads); + fTaskArenaW = ROOT::Internal::InitGlobalTaskArena(nThreads); } void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step, const std::function &f) { - tbb::this_task_arena::isolate([&]{ - tbb::parallel_for(start, end, step, f); + fTaskArenaW->Access()->execute([&]{ + tbb::this_task_arena::isolate([&]{ + tbb::parallel_for(start, end, step, f); + }); }); } double TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) { - return ROOT::Internal::ParallelReduceHelper(objs, redfunc); + return fTaskArenaW->Access()->execute([&]{ + return ROOT::Internal::ParallelReduceHelper(objs, redfunc); + }); } float TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) { - return ROOT::Internal::ParallelReduceHelper(objs, redfunc); + return fTaskArenaW->Access()->execute([&]{ + return ROOT::Internal::ParallelReduceHelper(objs, redfunc); + }); } unsigned TThreadExecutor::GetPoolSize(){ - return ROOT::Internal::TPoolManager::GetPoolSize(); + return fTaskArenaW->TaskArenaSize(); } } From 88a6de55105e9c7b82e8e9399844c3c70823877b Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Wed, 3 Jun 2020 16:30:42 +0000 Subject: [PATCH 02/20] initialize the task arena on instantiation --- core/imt/inc/ROOT/RTaskArena.hxx | 26 ++++-------- core/imt/src/RTaskArena.cxx | 71 ++++++++++++++++++++------------ core/imt/src/TImplicitMT.cxx | 4 +- core/imt/src/TThreadExecutor.cxx | 2 +- 4 files changed, 56 insertions(+), 47 deletions(-) diff --git a/core/imt/inc/ROOT/RTaskArena.hxx b/core/imt/inc/ROOT/RTaskArena.hxx index 60b733c4fdc4f..b4447cd1c47ef 100644 --- a/core/imt/inc/ROOT/RTaskArena.hxx +++ b/core/imt/inc/ROOT/RTaskArena.hxx @@ -47,38 +47,30 @@ namespace Internal { //////////////////////////////////////////////////////////////////////////////// /// Wrapper for tbb::task_arena. /// -/// Necessary in order to keep tbb away from ROOT headers +/// Necessary in order to keep tbb away from ROOT headers. +/// This class is thought out to be used as a singleton. //////////////////////////////////////////////////////////////////////////////// class RTaskArenaWrapper { public: - RTaskArenaWrapper(); - unsigned TaskArenaSize(); + RTaskArenaWrapper(unsigned maxConcurrency = 0); + ~RTaskArenaWrapper(); // necessary to set size back to zero + static unsigned TaskArenaSize(); std::unique_ptr &Access(); private: std::unique_ptr fTBBArena; + static unsigned fNWorkers; }; //////////////////////////////////////////////////////////////////////////////// // Factory function returning a shared pointer to the instance of the global -// RTaskArenaWrapper. +// RTaskArenaWrapper. The task arena still needs to be initialized with a +// call to RTaskArena::Initialize(maxConcurrency) // // Allows for reinstantiation of the global RTaskArenaWrapper once all the // references to the previous one are gone and the object destroyed. //////////////////////////////////////////////////////////////////////////////// -std::shared_ptr GetGlobalTaskArena(); - -//////////////////////////////////////////////////////////////////////////////// -/// Initializes the global instance of tbb::task_arena and returns a shared_ptr to -/// its singleton wrapper -/// -/// * Always initializes with the available number of threads -/// * Can't be reinitialized -/// * Checks for CPU bandwidth control -/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads, -/// which is CPU affinity aware -//////////////////////////////////////////////////////////////////////////////// -std::shared_ptr InitGlobalTaskArena(unsigned maxConcurrency); +std::shared_ptr GetGlobalTaskArena(unsigned maxConcurrency = 0); } // namespace Internal } // namespace ROOT diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 9f4d93e6ca776..e336c07fd2f3f 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -78,24 +78,63 @@ static Int_t LogicalCPUBandwithControl() namespace ROOT{ namespace Internal { -RTaskArenaWrapper::RTaskArenaWrapper(): fTBBArena(new tbb::task_arena{}){} -unsigned RTaskArenaWrapper::TaskArenaSize() +//////////////////////////////////////////////////////////////////////////////// +/// Initializes the tbb::task_arena within RTaskArenaWrapper +/// +/// * Can't be reinitialized +/// * Checks for CPU bandwidth control and avoids oversubscribing +/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads, +/// which is CPU affinity aware +//////////////////////////////////////////////////////////////////////////////// +RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency): fTBBArena(new tbb::task_arena{}) { - return fTBBArena->is_active()? static_cast(fTBBArena->max_concurrency()) : 0u; + if (!fTBBArena->is_active()) { + unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state + maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads; + unsigned bcCpus = LogicalCPUBandwithControl(); + if (maxConcurrency>bcCpus) { + Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", + bcCpus); + maxConcurrency = bcCpus; + } + fTBBArena->initialize(maxConcurrency); + fNWorkers = maxConcurrency; + ROOT::EnableThreadSafety(); + } else { + unsigned current = fTBBArena->max_concurrency(); + if (maxConcurrency && (current != maxConcurrency)) { + Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads", + current); + } + } +} + +RTaskArenaWrapper::~RTaskArenaWrapper() +{ + fNWorkers = 0u; } +unsigned RTaskArenaWrapper::fNWorkers = 0u; + +unsigned RTaskArenaWrapper::TaskArenaSize() +{ + return fNWorkers; +} +//////////////////////////////////////////////////////////////////////////////// +/// Provides access to the wrapped tbb::task_arena +//////////////////////////////////////////////////////////////////////////////// std::unique_ptr &RTaskArenaWrapper::Access() { return fTBBArena; } -std::shared_ptr GetGlobalTaskArena() +std::shared_ptr GetGlobalTaskArena(unsigned maxConcurrency) { static std::weak_ptr weak_GTAWrapper; if (weak_GTAWrapper.expired()) { - std::shared_ptr shared_GTAWrapper(new ROOT::Internal::RTaskArenaWrapper()); + std::shared_ptr shared_GTAWrapper(new ROOT::Internal::RTaskArenaWrapper(maxConcurrency)); weak_GTAWrapper = shared_GTAWrapper; return weak_GTAWrapper.lock(); } @@ -103,27 +142,5 @@ std::shared_ptr GetGlobalTaskArena() return weak_GTAWrapper.lock(); } -std::shared_ptr InitGlobalTaskArena(unsigned maxConcurrency) -{ - auto globalTBBTaskArena = GetGlobalTaskArena(); - if (!globalTBBTaskArena->Access()->is_active()) { - unsigned tbbDefaultNumberThreads = globalTBBTaskArena->Access()->max_concurrency(); // not initialized, automatic state - maxConcurrency = maxConcurrency > 1 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads; - unsigned bcCpus = LogicalCPUBandwithControl(); - auto taskArenaSize = std::min({maxConcurrency, bcCpus}); - globalTBBTaskArena->Access()->initialize(taskArenaSize); - ROOT::EnableThreadSafety(); - } else { - unsigned current = globalTBBTaskArena->Access()->max_concurrency(); - if (maxConcurrency && (current != maxConcurrency)) { - Warning("InitGlobalTaskArena", "There's already an active task arena. Proceeding with the current %d threads", - current); - } - } - - return globalTBBTaskArena; -} - - } // namespace Internal } // namespace ROOT diff --git a/core/imt/src/TImplicitMT.cxx b/core/imt/src/TImplicitMT.cxx index 4c7d54f3f09bd..8d92c7fa105aa 100644 --- a/core/imt/src/TImplicitMT.cxx +++ b/core/imt/src/TImplicitMT.cxx @@ -30,7 +30,7 @@ static std::shared_ptr &R__GetTaskArena4IMT() extern "C" UInt_t ROOT_MT_GetThreadPoolSize() { - return ROOT::Internal::GetGlobalTaskArena()->TaskArenaSize(); + return ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(); }; static bool &GetImplicitMTFlag() @@ -48,7 +48,7 @@ static std::atomic_int &GetParBranchProcessingCount() extern "C" void ROOT_TImplicitMT_EnableImplicitMT(UInt_t numthreads) { if (!GetImplicitMTFlag()) { - R__GetTaskArena4IMT() = ROOT::Internal::InitGlobalTaskArena(numthreads); + R__GetTaskArena4IMT() = ROOT::Internal::GetGlobalTaskArena(numthreads); GetImplicitMTFlag() = true; } else { ::Warning("ROOT_TImplicitMT_EnableImplicitMT", "Implicit multi-threading is already enabled"); diff --git a/core/imt/src/TThreadExecutor.cxx b/core/imt/src/TThreadExecutor.cxx index f647fc2c294f6..94cfb78f72a6e 100644 --- a/core/imt/src/TThreadExecutor.cxx +++ b/core/imt/src/TThreadExecutor.cxx @@ -137,7 +137,7 @@ namespace ROOT { /// ROOT::EnableThreadSafety(). TThreadExecutor::TThreadExecutor(UInt_t nThreads) { - fTaskArenaW = ROOT::Internal::InitGlobalTaskArena(nThreads); + fTaskArenaW = ROOT::Internal::GetGlobalTaskArena(nThreads); } void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step, const std::function &f) From e5551e3ee5c3c587424090a1a8da76878efe6b8c Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Thu, 4 Jun 2020 08:31:19 +0000 Subject: [PATCH 03/20] make variables const --- core/imt/src/RTaskArena.cxx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index e336c07fd2f3f..1fe2632cafde4 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -90,9 +90,9 @@ namespace Internal { RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency): fTBBArena(new tbb::task_arena{}) { if (!fTBBArena->is_active()) { - unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state + const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads; - unsigned bcCpus = LogicalCPUBandwithControl(); + const unsigned bcCpus = LogicalCPUBandwithControl(); if (maxConcurrency>bcCpus) { Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus); @@ -102,7 +102,7 @@ RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency): fTBBArena(new tbb fNWorkers = maxConcurrency; ROOT::EnableThreadSafety(); } else { - unsigned current = fTBBArena->max_concurrency(); + const unsigned current = fTBBArena->max_concurrency(); if (maxConcurrency && (current != maxConcurrency)) { Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads", current); From 7072d53af76c2936d774d2f64eb54394a9a7a59f Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Thu, 4 Jun 2020 09:00:46 +0000 Subject: [PATCH 04/20] provide access 2 the tbb::task arena instead of its wrapping unique_ptr --- core/imt/inc/ROOT/RTaskArena.hxx | 2 +- core/imt/src/RTaskArena.cxx | 6 +++--- core/imt/src/TThreadExecutor.cxx | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/imt/inc/ROOT/RTaskArena.hxx b/core/imt/inc/ROOT/RTaskArena.hxx index b4447cd1c47ef..116cf62018916 100644 --- a/core/imt/inc/ROOT/RTaskArena.hxx +++ b/core/imt/inc/ROOT/RTaskArena.hxx @@ -55,7 +55,7 @@ public: RTaskArenaWrapper(unsigned maxConcurrency = 0); ~RTaskArenaWrapper(); // necessary to set size back to zero static unsigned TaskArenaSize(); - std::unique_ptr &Access(); + tbb::task_arena &Access(); private: std::unique_ptr fTBBArena; static unsigned fNWorkers; diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 1fe2632cafde4..8bf9b3cb6003f 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -35,7 +35,7 @@ /// root[] gTA->InitGlobalTaskArena(nWorkers) // Initialize the global arena and enable Thread Safety in ROOT /// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads) /// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to call operations such as execute) -/// root[] root[] gTA->Access()->max_concurrency() // call to tbb::task_arena::max_concurrency() +/// root[] root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency() /// ~~~ /// ////////////////////////////////////////////////////////////////////////// @@ -124,9 +124,9 @@ unsigned RTaskArenaWrapper::TaskArenaSize() //////////////////////////////////////////////////////////////////////////////// /// Provides access to the wrapped tbb::task_arena //////////////////////////////////////////////////////////////////////////////// -std::unique_ptr &RTaskArenaWrapper::Access() +tbb::task_arena &RTaskArenaWrapper::Access() { - return fTBBArena; + return *fTBBArena; } diff --git a/core/imt/src/TThreadExecutor.cxx b/core/imt/src/TThreadExecutor.cxx index 94cfb78f72a6e..f1a633f0a7979 100644 --- a/core/imt/src/TThreadExecutor.cxx +++ b/core/imt/src/TThreadExecutor.cxx @@ -142,7 +142,7 @@ namespace ROOT { void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step, const std::function &f) { - fTaskArenaW->Access()->execute([&]{ + fTaskArenaW->Access().execute([&]{ tbb::this_task_arena::isolate([&]{ tbb::parallel_for(start, end, step, f); }); @@ -151,14 +151,14 @@ namespace ROOT { double TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) { - return fTaskArenaW->Access()->execute([&]{ + return fTaskArenaW->Access().execute([&]{ return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); } float TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) { - return fTaskArenaW->Access()->execute([&]{ + return fTaskArenaW->Access().execute([&]{ return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); } From eec42ecb0d501b17c10ab57a24a8a419a250ec9e Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Thu, 4 Jun 2020 09:20:45 +0000 Subject: [PATCH 05/20] replace use of stat by ifstream only ...when checking for bandwith control --- core/imt/src/RTaskArena.cxx | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 8bf9b3cb6003f..2a1e7ada74542 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -3,7 +3,6 @@ #include "TROOT.h" #include "TThread.h" #include -#include #include #include "tbb/task_arena.h" @@ -53,18 +52,13 @@ static Int_t LogicalCPUBandwithControl() { #ifdef R__LINUX // Check for CFS bandwith control - std::ifstream f; - std::string quotaFile("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); - struct stat buffer; - // Does the file exist? - if(stat(quotaFile.c_str(), &buffer) == 0) { - f.open(quotaFile); + std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file + if(f) { float cfs_quota; f>>cfs_quota; f.close(); if(cfs_quota > 0) { - std::string periodFile("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); - f.open(periodFile); + f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file float cfs_period; f>>cfs_period; f.close(); From b9633e61a9c6747e89239e398bf329ffb8ffe0b0 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Thu, 4 Jun 2020 13:27:43 +0000 Subject: [PATCH 06/20] avoid race conditions when retrieving the global task arena --- core/imt/src/RTaskArena.cxx | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 2a1e7ada74542..470a1e687144f 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -123,17 +123,14 @@ tbb::task_arena &RTaskArenaWrapper::Access() return *fTBBArena; } - std::shared_ptr GetGlobalTaskArena(unsigned maxConcurrency) { static std::weak_ptr weak_GTAWrapper; - if (weak_GTAWrapper.expired()) { - std::shared_ptr shared_GTAWrapper(new ROOT::Internal::RTaskArenaWrapper(maxConcurrency)); - weak_GTAWrapper = shared_GTAWrapper; - return weak_GTAWrapper.lock(); - } - - return weak_GTAWrapper.lock(); + if (auto sp = weak_GTAWrapper.lock()) + return sp; + auto sp = std::make_shared(maxConcurrency); + weak_GTAWrapper = sp; + return sp; } } // namespace Internal From a11297791839823e5a4d5600614283d81a2d4a3d Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Mon, 8 Jun 2020 13:59:17 +0000 Subject: [PATCH 07/20] add a test for the global task arena --- core/imt/test/CMakeLists.txt | 4 +- core/imt/test/testRTaskArena.cxx | 168 +++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 core/imt/test/testRTaskArena.cxx diff --git a/core/imt/test/CMakeLists.txt b/core/imt/test/CMakeLists.txt index 6580547b62bf1..6ce2e178da3ea 100644 --- a/core/imt/test/CMakeLists.txt +++ b/core/imt/test/CMakeLists.txt @@ -4,6 +4,6 @@ # For the licensing terms see $ROOTSYS/LICENSE. # For the list of contributors see $ROOTSYS/README/CREDITS. -ROOT_ADD_UNITTEST_DIR(Imt Thread) +ROOT_ADD_UNITTEST_DIR(Imt Thread ${TBB_LIBRARIES}) -ROOT_ADD_GTEST(testImt testTFuture.cxx testTTaskGroup.cxx LIBRARIES Imt) +ROOT_ADD_GTEST(testImt testRTaskArena.cxx testTFuture.cxx testTTaskGroup.cxx LIBRARIES Imt ${TBB_LIBRARIES}) diff --git a/core/imt/test/testRTaskArena.cxx b/core/imt/test/testRTaskArena.cxx new file mode 100644 index 0000000000000..91ed330a3c1d9 --- /dev/null +++ b/core/imt/test/testRTaskArena.cxx @@ -0,0 +1,168 @@ +#include "TROOT.h" +#include "ROOT/RTaskArena.hxx" +#include "ROOT/TThreadExecutor.hxx" +#include +#include +#include +#include "gtest/gtest.h" +#include "tbb/task_arena.h" + + +#ifdef R__USE_IMT + +unsigned LogicalCPUBandwithControl() +{ +#ifdef R__LINUX + // Check for CFS bandwith control + std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file + if(f) { + float cfs_quota; + f>>cfs_quota; + f.close(); + if(cfs_quota > 0) { + f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file + float cfs_period; + f>>cfs_period; + f.close(); + return static_cast(std::ceil(cfs_quota/cfs_period)); + } + } +#endif + return std::thread::hardware_concurrency(); +} + +const unsigned maxConcurrency = LogicalCPUBandwithControl(); +std::mt19937 randGenerator(0); // seed the generator +std::uniform_int_distribution<> plausibleNCores(1, maxConcurrency); // define the range + + + +TEST(RTaskArena, Size0WhenNoInstance) +{ + ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), 0u); +} + +TEST(RTaskArena, Construction) +{ + const unsigned nCores = plausibleNCores(randGenerator); + auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); + ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), nCores); +} + +TEST(RTaskArena, DefaultConstruction) +{ + auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(); + ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); +} + +TEST(RTaskArena, Reconstruction) +{ + unsigned nCores; + { + nCores = plausibleNCores(randGenerator); + auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), nCores); + } + + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), 0u); + + nCores = plausibleNCores(randGenerator); + auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); + ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), nCores); +} + +TEST(RTaskArena, SingleInstance) +{ + const unsigned nCores = plausibleNCores(randGenerator); + auto gTAInstance1 = ROOT::Internal::GetGlobalTaskArena(nCores); + auto gTAInstance2 = ROOT::Internal::GetGlobalTaskArena(plausibleNCores(randGenerator)); + ASSERT_EQ(&(*gTAInstance1), &(*gTAInstance2)); +} + +TEST(RTaskArena, AccessWorkingTBBtaskArena) +{ + const unsigned nCores = plausibleNCores(randGenerator); + auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); + auto tbbTACores = gTAInstance->Access().max_concurrency(); + ASSERT_EQ(nCores, tbbTACores); +} + +TEST(RTaskArena, KeepSize) +{ + const unsigned nCores = plausibleNCores(randGenerator); + auto gTAInstance1 = ROOT::Internal::GetGlobalTaskArena(nCores); + auto gTAInstance2 = ROOT::Internal::GetGlobalTaskArena(plausibleNCores(randGenerator)); + ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), nCores); +} + +//////////////////////////////////////////////////////////////////////// +// Integration Tests + +TEST(RTaskArena, CorrectSizeIMT) +{ + auto gTAInstance1 = ROOT::Internal::GetGlobalTaskArena(); + ROOT::EnableImplicitMT(plausibleNCores(randGenerator)); + ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); + ROOT::DisableImplicitMT(); +} + +TEST(RTaskArena, KeepSizeTThreadExecutor) +{ + const unsigned nCores = plausibleNCores(randGenerator); + auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); + ROOT::TThreadExecutor threadExecutor(plausibleNCores(randGenerator)); + ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), nCores); +} + +TEST(RTaskArena, InterleaveAndNest) +{ + unsigned nCores; + + // IMT + GTA + { + ROOT::EnableImplicitMT(); + nCores = plausibleNCores(randGenerator); + auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); + + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); + + ROOT::DisableImplicitMT(); + } + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), 0u); + + // IMT + TThreadExecutor + { + ROOT::EnableImplicitMT(); + ROOT::TThreadExecutor threadExecutor(plausibleNCores(randGenerator)); + + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); + + ROOT::DisableImplicitMT(); + } + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), 0u); + + // TThreadExecutor + IMT + { + ROOT::TThreadExecutor threadExecutor{}; + ROOT::EnableImplicitMT(plausibleNCores(randGenerator)); + + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); + + ROOT::DisableImplicitMT(); + } + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), 0u); + + // Nested TThreadExecutor + { + ROOT::TThreadExecutor threadExecutor{}; + auto fcn = [](){ + ROOT::TThreadExecutor te(plausibleNCores(randGenerator)); + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); + }; + threadExecutor.Foreach(fcn, 2); + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); + } + EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), 0u); +} + +#endif From 50e62ddb8412781211eba0bad26dcc7aed07b14b Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Mon, 8 Jun 2020 14:16:51 +0000 Subject: [PATCH 08/20] get the global task arena within a critical section --- core/imt/src/RTaskArena.cxx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 470a1e687144f..8adaa4e8478d7 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -3,6 +3,7 @@ #include "TROOT.h" #include "TThread.h" #include +#include #include #include "tbb/task_arena.h" @@ -126,6 +127,9 @@ tbb::task_arena &RTaskArenaWrapper::Access() std::shared_ptr GetGlobalTaskArena(unsigned maxConcurrency) { static std::weak_ptr weak_GTAWrapper; + + static std::mutex m; + const std::lock_guard lock{m}; if (auto sp = weak_GTAWrapper.lock()) return sp; auto sp = std::make_shared(maxConcurrency); From b456d7b8993af1c97a494d427d2afe1e9b61685f Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Tue, 9 Jun 2020 13:11:49 +0000 Subject: [PATCH 09/20] move warning to task arena acquisiton --- core/imt/src/RTaskArena.cxx | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 8adaa4e8478d7..fee6536397d47 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -84,25 +84,17 @@ namespace Internal { //////////////////////////////////////////////////////////////////////////////// RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency): fTBBArena(new tbb::task_arena{}) { - if (!fTBBArena->is_active()) { - const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state - maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads; - const unsigned bcCpus = LogicalCPUBandwithControl(); - if (maxConcurrency>bcCpus) { - Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", - bcCpus); - maxConcurrency = bcCpus; - } - fTBBArena->initialize(maxConcurrency); - fNWorkers = maxConcurrency; - ROOT::EnableThreadSafety(); - } else { - const unsigned current = fTBBArena->max_concurrency(); - if (maxConcurrency && (current != maxConcurrency)) { - Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads", - current); - } + const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state + maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads; + const unsigned bcCpus = LogicalCPUBandwithControl(); + if (maxConcurrency>bcCpus) { + Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", + bcCpus); + maxConcurrency = bcCpus; } + fTBBArena->initialize(maxConcurrency); + fNWorkers = maxConcurrency; + ROOT::EnableThreadSafety(); } RTaskArenaWrapper::~RTaskArenaWrapper() @@ -130,8 +122,13 @@ std::shared_ptr GetGlobalTaskArena(unsigned m static std::mutex m; const std::lock_guard lock{m}; - if (auto sp = weak_GTAWrapper.lock()) + if (auto sp = weak_GTAWrapper.lock()) { + if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) { + Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads", + sp->TaskArenaSize()); + } return sp; + } auto sp = std::make_shared(maxConcurrency); weak_GTAWrapper = sp; return sp; From 7a8b1c797efbb3601aa97b656968865f627295b6 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Tue, 9 Jun 2020 13:28:53 +0000 Subject: [PATCH 10/20] expose ROOT::Internal::LogicalCPUBandwithControl --- core/imt/inc/ROOT/RTaskArena.hxx | 11 +++++++++++ core/imt/src/RTaskArena.cxx | 15 +++------------ core/imt/test/testRTaskArena.cxx | 23 +---------------------- 3 files changed, 15 insertions(+), 34 deletions(-) diff --git a/core/imt/inc/ROOT/RTaskArena.hxx b/core/imt/inc/ROOT/RTaskArena.hxx index 116cf62018916..c0da93b3eb6b7 100644 --- a/core/imt/inc/ROOT/RTaskArena.hxx +++ b/core/imt/inc/ROOT/RTaskArena.hxx @@ -44,6 +44,17 @@ using task_arena = interface7::task_arena; namespace ROOT { namespace Internal { +//////////////////////////////////////////////////////////////////////////////// +/// Returns the available number of logical cores. +/// +/// - Checks if there is CFS bandwidth control in place (linux, via cgroups, +/// assuming standard paths) +/// - Otherwise, returns the number of logical cores provided by +/// std::thread::hardware_concurrency() +//////////////////////////////////////////////////////////////////////////////// +int LogicalCPUBandwithControl(); + + //////////////////////////////////////////////////////////////////////////////// /// Wrapper for tbb::task_arena. /// diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index fee6536397d47..2470bd3261983 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -40,16 +40,10 @@ /// ////////////////////////////////////////////////////////////////////////// +namespace ROOT{ +namespace Internal { -//////////////////////////////////////////////////////////////////////////////// -/// Returns the available number of logical cores. -/// -/// - Checks if there is CFS bandwidth control in place (linux, via cgroups, -/// assuming standard paths) -/// - Otherwise, returns the number of logical cores provided by -/// std::thread::hardware_concurrency() -//////////////////////////////////////////////////////////////////////////////// -static Int_t LogicalCPUBandwithControl() +int LogicalCPUBandwithControl() { #ifdef R__LINUX // Check for CFS bandwith control @@ -70,9 +64,6 @@ static Int_t LogicalCPUBandwithControl() return std::thread::hardware_concurrency(); } -namespace ROOT{ -namespace Internal { - //////////////////////////////////////////////////////////////////////////////// /// Initializes the tbb::task_arena within RTaskArenaWrapper diff --git a/core/imt/test/testRTaskArena.cxx b/core/imt/test/testRTaskArena.cxx index 91ed330a3c1d9..a2b8ffb12e84d 100644 --- a/core/imt/test/testRTaskArena.cxx +++ b/core/imt/test/testRTaskArena.cxx @@ -10,28 +10,7 @@ #ifdef R__USE_IMT -unsigned LogicalCPUBandwithControl() -{ -#ifdef R__LINUX - // Check for CFS bandwith control - std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file - if(f) { - float cfs_quota; - f>>cfs_quota; - f.close(); - if(cfs_quota > 0) { - f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file - float cfs_period; - f>>cfs_period; - f.close(); - return static_cast(std::ceil(cfs_quota/cfs_period)); - } - } -#endif - return std::thread::hardware_concurrency(); -} - -const unsigned maxConcurrency = LogicalCPUBandwithControl(); +const unsigned maxConcurrency = ROOT::Internal::LogicalCPUBandwithControl(); std::mt19937 randGenerator(0); // seed the generator std::uniform_int_distribution<> plausibleNCores(1, maxConcurrency); // define the range From a4f51e6c1140116136534a03eea1cb4416e95e03 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Tue, 9 Jun 2020 15:25:05 +0000 Subject: [PATCH 11/20] improve comments and documentation --- core/imt/inc/ROOT/RTaskArena.hxx | 15 +++++++-------- core/imt/src/RTaskArena.cxx | 15 +++++++-------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/core/imt/inc/ROOT/RTaskArena.hxx b/core/imt/inc/ROOT/RTaskArena.hxx index c0da93b3eb6b7..3ae36b778dee6 100644 --- a/core/imt/inc/ROOT/RTaskArena.hxx +++ b/core/imt/inc/ROOT/RTaskArena.hxx @@ -30,7 +30,7 @@ #ifndef R__USE_IMT // No need to error out for dictionaries. # if !defined(__ROOTCLING__) && !defined(G__DICTIONARY) -# error "Cannot use ROOT::Internal::RTaskArenaWrapper without defining R__USE_IMT." +# error "Cannot use ROOT::Internal::RTaskArenaWrapper if build option imt=OFF." # endif #else @@ -65,7 +65,7 @@ class RTaskArenaWrapper { public: RTaskArenaWrapper(unsigned maxConcurrency = 0); ~RTaskArenaWrapper(); // necessary to set size back to zero - static unsigned TaskArenaSize(); + static unsigned TaskArenaSize(); // A static getter lets us check for RTaskArenaWrapper's existence tbb::task_arena &Access(); private: std::unique_ptr fTBBArena; @@ -74,12 +74,11 @@ private: //////////////////////////////////////////////////////////////////////////////// -// Factory function returning a shared pointer to the instance of the global -// RTaskArenaWrapper. The task arena still needs to be initialized with a -// call to RTaskArena::Initialize(maxConcurrency) -// -// Allows for reinstantiation of the global RTaskArenaWrapper once all the -// references to the previous one are gone and the object destroyed. +/// Factory function returning a shared pointer to the instance of the global +/// RTaskArenaWrapper. +/// +/// Allows for reinstantiation of the global RTaskArenaWrapper once all the +/// references to the previous one are gone and the object destroyed. //////////////////////////////////////////////////////////////////////////////// std::shared_ptr GetGlobalTaskArena(unsigned maxConcurrency = 0); diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 2470bd3261983..4d5e80afd0dce 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -7,7 +7,6 @@ #include #include "tbb/task_arena.h" - ////////////////////////////////////////////////////////////////////////// /// /// \class ROOT::Internal::RTaskArenaWrapper @@ -31,11 +30,12 @@ /// /// #### Examples: /// ~~~{.cpp} -/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena() //get a shared_ptr to the global arena -/// root[] gTA->InitGlobalTaskArena(nWorkers) // Initialize the global arena and enable Thread Safety in ROOT +/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena(nWorkers) //get a shared_ptr to the global arena and initialize +/// //it with nWorkers. Enable thread safety in ROOT /// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads) -/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to call operations such as execute) -/// root[] root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency() +/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to +/// //call operations such as execute) +/// root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency() /// ~~~ /// ////////////////////////////////////////////////////////////////////////// @@ -64,9 +64,8 @@ int LogicalCPUBandwithControl() return std::thread::hardware_concurrency(); } - //////////////////////////////////////////////////////////////////////////////// -/// Initializes the tbb::task_arena within RTaskArenaWrapper +/// Initializes the tbb::task_arena within RTaskArenaWrapper. /// /// * Can't be reinitialized /// * Checks for CPU bandwidth control and avoids oversubscribing @@ -100,7 +99,7 @@ unsigned RTaskArenaWrapper::TaskArenaSize() return fNWorkers; } //////////////////////////////////////////////////////////////////////////////// -/// Provides access to the wrapped tbb::task_arena +/// Provides access to the wrapped tbb::task_arena. //////////////////////////////////////////////////////////////////////////////// tbb::task_arena &RTaskArenaWrapper::Access() { From b0c094300e6f720b955c493daeda59ca9133c9e2 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Wed, 10 Jun 2020 07:12:56 +0000 Subject: [PATCH 12/20] apply clang format --- core/imt/src/RTaskArena.cxx | 21 ++++++++++---------- core/imt/src/TThreadExecutor.cxx | 34 ++++++++++++++------------------ core/imt/test/testRTaskArena.cxx | 19 ++++++++---------- 3 files changed, 33 insertions(+), 41 deletions(-) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 4d5e80afd0dce..b92ff96e9cb34 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -40,7 +40,7 @@ /// ////////////////////////////////////////////////////////////////////////// -namespace ROOT{ +namespace ROOT { namespace Internal { int LogicalCPUBandwithControl() @@ -48,16 +48,16 @@ int LogicalCPUBandwithControl() #ifdef R__LINUX // Check for CFS bandwith control std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file - if(f) { + if (f) { float cfs_quota; - f>>cfs_quota; + f >> cfs_quota; f.close(); - if(cfs_quota > 0) { + if (cfs_quota > 0) { f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file float cfs_period; - f>>cfs_period; + f >> cfs_period; f.close(); - return static_cast(std::ceil(cfs_quota/cfs_period)); + return static_cast(std::ceil(cfs_quota / cfs_period)); } } #endif @@ -72,14 +72,13 @@ int LogicalCPUBandwithControl() /// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads, /// which is CPU affinity aware //////////////////////////////////////////////////////////////////////////////// -RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency): fTBBArena(new tbb::task_arena{}) +RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new tbb::task_arena{}) { const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads; const unsigned bcCpus = LogicalCPUBandwithControl(); - if (maxConcurrency>bcCpus) { - Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", - bcCpus); + if (maxConcurrency > bcCpus) { + Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus); maxConcurrency = bcCpus; } fTBBArena->initialize(maxConcurrency); @@ -115,7 +114,7 @@ std::shared_ptr GetGlobalTaskArena(unsigned m if (auto sp = weak_GTAWrapper.lock()) { if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) { Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads", - sp->TaskArenaSize()); + sp->TaskArenaSize()); } return sp; } diff --git a/core/imt/src/TThreadExecutor.cxx b/core/imt/src/TThreadExecutor.cxx index f1a633f0a7979..a536ab98013f0 100644 --- a/core/imt/src/TThreadExecutor.cxx +++ b/core/imt/src/TThreadExecutor.cxx @@ -126,18 +126,18 @@ static T ParallelReduceHelper(const std::vector &objs, const std::function &f) @@ -151,20 +151,16 @@ namespace ROOT { double TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) { - return fTaskArenaW->Access().execute([&]{ - return ROOT::Internal::ParallelReduceHelper(objs, redfunc); - }); + return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); } float TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) { - return fTaskArenaW->Access().execute([&]{ - return ROOT::Internal::ParallelReduceHelper(objs, redfunc); - }); + return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); } unsigned TThreadExecutor::GetPoolSize(){ - return fTaskArenaW->TaskArenaSize(); + return fTaskArenaW->TaskArenaSize(); } } diff --git a/core/imt/test/testRTaskArena.cxx b/core/imt/test/testRTaskArena.cxx index a2b8ffb12e84d..74585a8ba3747 100644 --- a/core/imt/test/testRTaskArena.cxx +++ b/core/imt/test/testRTaskArena.cxx @@ -7,15 +7,12 @@ #include "gtest/gtest.h" #include "tbb/task_arena.h" - #ifdef R__USE_IMT const unsigned maxConcurrency = ROOT::Internal::LogicalCPUBandwithControl(); -std::mt19937 randGenerator(0); // seed the generator +std::mt19937 randGenerator(0); // seed the generator std::uniform_int_distribution<> plausibleNCores(1, maxConcurrency); // define the range - - TEST(RTaskArena, Size0WhenNoInstance) { ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), 0u); @@ -23,7 +20,7 @@ TEST(RTaskArena, Size0WhenNoInstance) TEST(RTaskArena, Construction) { - const unsigned nCores = plausibleNCores(randGenerator); + const unsigned nCores = plausibleNCores(randGenerator); auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), nCores); } @@ -52,7 +49,7 @@ TEST(RTaskArena, Reconstruction) TEST(RTaskArena, SingleInstance) { - const unsigned nCores = plausibleNCores(randGenerator); + const unsigned nCores = plausibleNCores(randGenerator); auto gTAInstance1 = ROOT::Internal::GetGlobalTaskArena(nCores); auto gTAInstance2 = ROOT::Internal::GetGlobalTaskArena(plausibleNCores(randGenerator)); ASSERT_EQ(&(*gTAInstance1), &(*gTAInstance2)); @@ -60,7 +57,7 @@ TEST(RTaskArena, SingleInstance) TEST(RTaskArena, AccessWorkingTBBtaskArena) { - const unsigned nCores = plausibleNCores(randGenerator); + const unsigned nCores = plausibleNCores(randGenerator); auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); auto tbbTACores = gTAInstance->Access().max_concurrency(); ASSERT_EQ(nCores, tbbTACores); @@ -68,7 +65,7 @@ TEST(RTaskArena, AccessWorkingTBBtaskArena) TEST(RTaskArena, KeepSize) { - const unsigned nCores = plausibleNCores(randGenerator); + const unsigned nCores = plausibleNCores(randGenerator); auto gTAInstance1 = ROOT::Internal::GetGlobalTaskArena(nCores); auto gTAInstance2 = ROOT::Internal::GetGlobalTaskArena(plausibleNCores(randGenerator)); ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), nCores); @@ -87,7 +84,7 @@ TEST(RTaskArena, CorrectSizeIMT) TEST(RTaskArena, KeepSizeTThreadExecutor) { - const unsigned nCores = plausibleNCores(randGenerator); + const unsigned nCores = plausibleNCores(randGenerator); auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(nCores); ROOT::TThreadExecutor threadExecutor(plausibleNCores(randGenerator)); ASSERT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), nCores); @@ -134,10 +131,10 @@ TEST(RTaskArena, InterleaveAndNest) // Nested TThreadExecutor { ROOT::TThreadExecutor threadExecutor{}; - auto fcn = [](){ + auto fcn = []() { ROOT::TThreadExecutor te(plausibleNCores(randGenerator)); EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); - }; + }; threadExecutor.Foreach(fcn, 2); EXPECT_EQ(ROOT::Internal::RTaskArenaWrapper::TaskArenaSize(), maxConcurrency); } From a30e9a5910bcf6cb676dd8678b88243ce7568046 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Wed, 10 Jun 2020 07:46:40 +0000 Subject: [PATCH 13/20] make RtaskArenaWrapper constructor private ...and only allow RTaskarenaWrapper to be constructed from GetGlobalTaskArena --- core/imt/inc/ROOT/RTaskArena.hxx | 3 ++- core/imt/src/RTaskArena.cxx | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/imt/inc/ROOT/RTaskArena.hxx b/core/imt/inc/ROOT/RTaskArena.hxx index 3ae36b778dee6..e976445885ca7 100644 --- a/core/imt/inc/ROOT/RTaskArena.hxx +++ b/core/imt/inc/ROOT/RTaskArena.hxx @@ -63,11 +63,12 @@ int LogicalCPUBandwithControl(); //////////////////////////////////////////////////////////////////////////////// class RTaskArenaWrapper { public: - RTaskArenaWrapper(unsigned maxConcurrency = 0); ~RTaskArenaWrapper(); // necessary to set size back to zero static unsigned TaskArenaSize(); // A static getter lets us check for RTaskArenaWrapper's existence tbb::task_arena &Access(); private: + RTaskArenaWrapper(unsigned maxConcurrency = 0); + friend std::shared_ptr GetGlobalTaskArena(unsigned maxConcurrency); std::unique_ptr fTBBArena; static unsigned fNWorkers; }; diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index b92ff96e9cb34..5fcb44ba1ee38 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -118,7 +118,7 @@ std::shared_ptr GetGlobalTaskArena(unsigned m } return sp; } - auto sp = std::make_shared(maxConcurrency); + std::shared_ptr sp(new ROOT::Internal::RTaskArenaWrapper(maxConcurrency)); weak_GTAWrapper = sp; return sp; } From f5d275a50f944a1b16b8f143b766b106e0ca56cc Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Thu, 11 Jun 2020 05:43:32 +0000 Subject: [PATCH 14/20] doublecheck documentation ..for things that were true in TPoolManager but not anymore --- core/imt/src/RTaskArena.cxx | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 5fcb44ba1ee38..477dfc932b6ab 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -14,19 +14,13 @@ /// \brief Wrapper over tbb::task_arena /// /// This class is a wrapper over tbb::task_arena, in order to keep -/// TBB away from ROOT's headers. We keep a single global instance, -/// obtained with `ROOT::Internal::GetGlobalTaskArena()`, to be used by any -/// parallel ROOT class with TBB as a backend. This has several advantages: +/// TBB away from ROOT's headers. We keep a single global instance to be +/// used by any parallel ROOT class with TBB as a backend. /// -/// - Provides a unique interface to the TBB scheduler: TThreadExecutor, -/// IMT and any class relying on TBB will get a pointer to the scheduler -/// through `ROOT::Internal::GetGlobalTaskArena()`, which will return a -/// reference to the only pointer to the TBB scheduler that will be -/// active in any ROOT Process -/// - Solves multiple undefined behaviors. Guaranteeing that all classes -/// use the same task arena avoids interferences and undefined behavior -/// by providing a single instance of the tbb::task_arena and automated -/// bookkeeping, instantiation and destruction. +/// TThreadExecutor, IMT and any class relying on TBB will get a pointer +/// to the scheduler through `ROOT::Internal::GetGlobalTaskArena()`, which +/// will return areference to the only pointer to the TBB scheduler that +/// will be active in any ROOT Process. /// /// #### Examples: /// ~~~{.cpp} From 7917be81eb41ad3345eeaab544c7e5e250120364 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Thu, 11 Jun 2020 07:45:18 +0000 Subject: [PATCH 15/20] fix indentation --- core/imt/src/TThreadExecutor.cxx | 51 ++++++++++++++++---------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/core/imt/src/TThreadExecutor.cxx b/core/imt/src/TThreadExecutor.cxx index a536ab98013f0..4e97e3294af00 100644 --- a/core/imt/src/TThreadExecutor.cxx +++ b/core/imt/src/TThreadExecutor.cxx @@ -115,16 +115,13 @@ static T ParallelReduceHelper(const std::vector &objs, const std::function &f) - { - fTaskArenaW->Access().execute([&]{ - tbb::this_task_arena::isolate([&]{ - tbb::parallel_for(start, end, step, f); - }); - }); - } +} - double TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) - { - return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); - } +void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step, + const std::function &f) +{ + fTaskArenaW->Access().execute([&] { + tbb::this_task_arena::isolate([&] { + tbb::parallel_for(start, end, step, f); + }); + }); +} - float TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) - { - return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); - } +double TThreadExecutor::ParallelReduce(const std::vector &objs, + const std::function &redfunc) +{ + return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); +} - unsigned TThreadExecutor::GetPoolSize(){ - return fTaskArenaW->TaskArenaSize(); - } +float TThreadExecutor::ParallelReduce(const std::vector &objs, + const std::function &redfunc) +{ + return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); +} +unsigned TThreadExecutor::GetPoolSize() +{ + return fTaskArenaW->TaskArenaSize(); } + +} // namespace ROOT From 3f472becf654c238356de98d563b0a9f02418015 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Wed, 20 Jan 2021 14:44:43 +0100 Subject: [PATCH 16/20] remove TPoolManager from ROOT --- core/imt/inc/LinkDef.h | 1 - core/imt/src/TPoolManager.cxx | 107 ---------------------------------- 2 files changed, 108 deletions(-) delete mode 100644 core/imt/src/TPoolManager.cxx diff --git a/core/imt/inc/LinkDef.h b/core/imt/inc/LinkDef.h index 2e1588854844d..4ece2a5e58f91 100644 --- a/core/imt/inc/LinkDef.h +++ b/core/imt/inc/LinkDef.h @@ -7,7 +7,6 @@ // Only for the autoload, autoparse. No IO of these classes is foreseen! // Exclude in case ROOT does not have IMT support #ifdef R__USE_IMT -#pragma link C++ class ROOT::Internal::TPoolManager-; #pragma link C++ class ROOT::TThreadExecutor-; #pragma link C++ class ROOT::Experimental::TTaskGroup-; #endif diff --git a/core/imt/src/TPoolManager.cxx b/core/imt/src/TPoolManager.cxx deleted file mode 100644 index 53b41da85bc35..0000000000000 --- a/core/imt/src/TPoolManager.cxx +++ /dev/null @@ -1,107 +0,0 @@ -#include "ROOT/TPoolManager.hxx" -#include "TError.h" -#include "TROOT.h" -#include -#include -#ifdef R__LINUX -#include -#include -#endif -#include "tbb/task_scheduler_init.h" - - -//////////////////////////////////////////////////////////////////////////////// -/// Returns the available number of logical cores. -/// -/// - Checks if there is CFS bandwith control in place (linux, via cgroups, -/// assuming standard paths) -/// - Otherwise, returns the number of logical cores provided by tbb by default. -/// This is processor affinity aware, at least in Linux. -//////////////////////////////////////////////////////////////////////////////// - - -namespace ROOT { - - namespace Internal { - - //Returns the available number of logical cores. - // - Checks if there is CFS bandwith control in place (linux, via cgroups, - // assuming standard paths) - // - Otherwise, returns the number of logical cores provided by tbb by default. - // This is processor affinity aware, at least in Linux. - Int_t NLogicalCores() - { - #ifdef R__LINUX - // Check for CFS bandwith control - std::ifstream f; - std::string quotaFile("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); - struct stat buffer; - // Does the file exist? - if(stat(quotaFile.c_str(), &buffer) == 0) { - f.open(quotaFile); - float cfs_quota; - f>>cfs_quota; - f.close(); - if(cfs_quota > 0) { - std::string periodFile("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); - f.open(periodFile); - float cfs_period; - f>>cfs_period; - f.close(); - return static_cast(std::ceil(cfs_quota/cfs_period)); - } - } - #endif - return tbb::task_scheduler_init::default_num_threads(); - } - - //Returns the weak_ptr reflecting a shared_ptr to the only instance of the Pool Manager. - //This will allow to check if the shared_ptr is still alive, solving the dangling pointer problem. - std::weak_ptr &GetWP() - { - static std::weak_ptr weak_sched; - return weak_sched; - } - - UInt_t TPoolManager::fgPoolSize = 0; - - TPoolManager::TPoolManager(UInt_t nThreads): fSched(new tbb::task_scheduler_init(tbb::task_scheduler_init::deferred)) - { - //Is it there another instance of the tbb scheduler running? - if (fSched->is_active()) { - mustDelete = false; - } - - nThreads = nThreads != 0 ? nThreads : NLogicalCores(); - fSched ->initialize(nThreads); - fgPoolSize = nThreads; - }; - - TPoolManager::~TPoolManager() - { - //Only terminate the tbb scheduler if there was not another instance already - // running when the constructor was called. - if (mustDelete) { - fSched->terminate(); - fgPoolSize = 0; - } - } - - //Number of threads the PoolManager has been initialized with. - UInt_t TPoolManager::GetPoolSize() - { - return fgPoolSize; - } - - //Factory function returning a shared pointer to the only instance of the PoolManager. - std::shared_ptr GetPoolManager(UInt_t nThreads) - { - if (GetWP().expired()) { - std::shared_ptr shared(new TPoolManager(nThreads)); - GetWP() = shared; - return GetWP().lock(); - } - return GetWP().lock(); - } - } -} From c674f6ec9246619b94652e3dea6e3401f8f48585 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Wed, 20 Jan 2021 09:50:20 +0100 Subject: [PATCH 17/20] fix missing include in TTreeCacheUnzip --- tree/tree/src/TTreeCacheUnzip.cxx | 1 + 1 file changed, 1 insertion(+) diff --git a/tree/tree/src/TTreeCacheUnzip.cxx b/tree/tree/src/TTreeCacheUnzip.cxx index 39cc5c01295aa..a48d707176998 100644 --- a/tree/tree/src/TTreeCacheUnzip.cxx +++ b/tree/tree/src/TTreeCacheUnzip.cxx @@ -26,6 +26,7 @@ A TTreeCache which exploits parallelized decompression of its own content. #include "TFile.h" #include "TMath.h" #include "TMutex.h" +#include "TROOT.h" #include "ROOT/RMakeUnique.hxx" #ifdef R__USE_IMT From 4b4880f8bfaabfb32632cc4d583328e544892364 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Mon, 16 Nov 2020 09:51:43 +0100 Subject: [PATCH 18/20] warn on tbb::global_control interferences warn when the number of threads set by the user is limited at runtime by tbb::global_control. Fix for github issue #6363: https://github.com/root-project/root/issues/6363 --- core/imt/src/RTaskArena.cxx | 5 +++++ core/imt/src/TThreadExecutor.cxx | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 477dfc932b6ab..883cee96d870f 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -6,6 +6,7 @@ #include #include #include "tbb/task_arena.h" +#include "tbb/global_control.h" ////////////////////////////////////////////////////////////////////////// /// @@ -75,6 +76,10 @@ RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new tb Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus); maxConcurrency = bcCpus; } + if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) { + Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers" + "from this task arena available for execution."); + } fTBBArena->initialize(maxConcurrency); fNWorkers = maxConcurrency; ROOT::EnableThreadSafety(); diff --git a/core/imt/src/TThreadExecutor.cxx b/core/imt/src/TThreadExecutor.cxx index 4e97e3294af00..5a07159178245 100644 --- a/core/imt/src/TThreadExecutor.cxx +++ b/core/imt/src/TThreadExecutor.cxx @@ -140,6 +140,12 @@ TThreadExecutor::TThreadExecutor(UInt_t nThreads) void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step, const std::function &f) { + if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) { + Warning("TThreadExecutor::ParallelFor", + "tbb::global_control is limiting the number of parallel workers." + " Proceeding with %zu threads this time", + tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)); + } fTaskArenaW->Access().execute([&] { tbb::this_task_arena::isolate([&] { tbb::parallel_for(start, end, step, f); @@ -150,12 +156,24 @@ void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned double TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) { + if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) { + Warning("TThreadExecutor::ParallelReduce", + "tbb::global_control is limiting the number of parallel workers." + " Proceeding with %zu threads this time", + tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)); + } return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); } float TThreadExecutor::ParallelReduce(const std::vector &objs, const std::function &redfunc) { + if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) { + Warning("TThreadExecutor::ParallelReduce", + "tbb::global_control is limiting the number of parallel workers." + " Proceeding with %zu threads this time", + tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)); + } return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper(objs, redfunc); }); } From d3d3b3fc8f8b8920130c92275418f6ee42c2fa20 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Mon, 18 Jan 2021 10:56:12 +0100 Subject: [PATCH 19/20] set TBB_PREVIEW_GLOBAL_CONTROL for older tbb versions --- core/imt/src/RTaskArena.cxx | 1 + core/imt/src/TThreadExecutor.cxx | 2 ++ 2 files changed, 3 insertions(+) diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index 883cee96d870f..b60b33e3cdcee 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -6,6 +6,7 @@ #include #include #include "tbb/task_arena.h" +#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4 #include "tbb/global_control.h" ////////////////////////////////////////////////////////////////////////// diff --git a/core/imt/src/TThreadExecutor.cxx b/core/imt/src/TThreadExecutor.cxx index 5a07159178245..f84987f52c1b5 100644 --- a/core/imt/src/TThreadExecutor.cxx +++ b/core/imt/src/TThreadExecutor.cxx @@ -4,6 +4,8 @@ #pragma GCC diagnostic ignored "-Wshadow" #endif #include "tbb/tbb.h" +#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4 +#include "tbb/global_control.h" #if !defined(_MSC_VER) #pragma GCC diagnostic pop #endif From fa15e9d298bdfbf354c00ff2ca806ab87cc5d3e5 Mon Sep 17 00:00:00 2001 From: Xavier Valls Pla Date: Tue, 19 Jan 2021 10:47:13 +0100 Subject: [PATCH 20/20] remove dependency on tbb interfaces from RTaskArena Introduce yet another layer of abstraction in ROpaqueTaskArena, a class inheriting from tbb::task_arena that will allow us to keep tbb hidden from ROOT interfaces while solving the issue of having to forward-declare tbb::task_arena in an interface-dependent way --- core/imt/inc/ROOT/RTaskArena.hxx | 11 +++++------ core/imt/src/ROpaqueTaskArena.hxx | 5 +++++ core/imt/src/RTaskArena.cxx | 5 +++-- core/imt/src/TThreadExecutor.cxx | 1 + core/imt/test/testRTaskArena.cxx | 2 +- 5 files changed, 15 insertions(+), 9 deletions(-) create mode 100644 core/imt/src/ROpaqueTaskArena.hxx diff --git a/core/imt/inc/ROOT/RTaskArena.hxx b/core/imt/inc/ROOT/RTaskArena.hxx index e976445885ca7..5fef2b78e248c 100644 --- a/core/imt/inc/ROOT/RTaskArena.hxx +++ b/core/imt/inc/ROOT/RTaskArena.hxx @@ -36,12 +36,11 @@ /// tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow /// to forward declare tbb::task_arena without forward declaring tbb::interface7 -namespace tbb{ -namespace interface7{class task_arena;} -using task_arena = interface7::task_arena; -} namespace ROOT { + +class ROpaqueTaskArena; + namespace Internal { //////////////////////////////////////////////////////////////////////////////// @@ -65,11 +64,11 @@ class RTaskArenaWrapper { public: ~RTaskArenaWrapper(); // necessary to set size back to zero static unsigned TaskArenaSize(); // A static getter lets us check for RTaskArenaWrapper's existence - tbb::task_arena &Access(); + ROOT::ROpaqueTaskArena &Access(); private: RTaskArenaWrapper(unsigned maxConcurrency = 0); friend std::shared_ptr GetGlobalTaskArena(unsigned maxConcurrency); - std::unique_ptr fTBBArena; + std::unique_ptr fTBBArena; static unsigned fNWorkers; }; diff --git a/core/imt/src/ROpaqueTaskArena.hxx b/core/imt/src/ROpaqueTaskArena.hxx new file mode 100644 index 0000000000000..6da5a5c68da1f --- /dev/null +++ b/core/imt/src/ROpaqueTaskArena.hxx @@ -0,0 +1,5 @@ +#include "tbb/task_arena.h" + +namespace ROOT { +class ROpaqueTaskArena: public tbb::task_arena {}; +} diff --git a/core/imt/src/RTaskArena.cxx b/core/imt/src/RTaskArena.cxx index b60b33e3cdcee..ecde842daf503 100644 --- a/core/imt/src/RTaskArena.cxx +++ b/core/imt/src/RTaskArena.cxx @@ -1,4 +1,5 @@ #include "ROOT/RTaskArena.hxx" +#include "ROpaqueTaskArena.hxx" #include "TError.h" #include "TROOT.h" #include "TThread.h" @@ -68,7 +69,7 @@ int LogicalCPUBandwithControl() /// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads, /// which is CPU affinity aware //////////////////////////////////////////////////////////////////////////////// -RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new tbb::task_arena{}) +RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new ROpaqueTaskArena{}) { const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads; @@ -100,7 +101,7 @@ unsigned RTaskArenaWrapper::TaskArenaSize() //////////////////////////////////////////////////////////////////////////////// /// Provides access to the wrapped tbb::task_arena. //////////////////////////////////////////////////////////////////////////////// -tbb::task_arena &RTaskArenaWrapper::Access() +ROOT::ROpaqueTaskArena &RTaskArenaWrapper::Access() { return *fTBBArena; } diff --git a/core/imt/src/TThreadExecutor.cxx b/core/imt/src/TThreadExecutor.cxx index f84987f52c1b5..3f216168a9ffa 100644 --- a/core/imt/src/TThreadExecutor.cxx +++ b/core/imt/src/TThreadExecutor.cxx @@ -1,4 +1,5 @@ #include "ROOT/TThreadExecutor.hxx" +#include "ROpaqueTaskArena.hxx" #if !defined(_MSC_VER) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wshadow" diff --git a/core/imt/test/testRTaskArena.cxx b/core/imt/test/testRTaskArena.cxx index 74585a8ba3747..1ee2e2e980109 100644 --- a/core/imt/test/testRTaskArena.cxx +++ b/core/imt/test/testRTaskArena.cxx @@ -1,11 +1,11 @@ #include "TROOT.h" #include "ROOT/RTaskArena.hxx" #include "ROOT/TThreadExecutor.hxx" +#include "../src/ROpaqueTaskArena.hxx" #include #include #include #include "gtest/gtest.h" -#include "tbb/task_arena.h" #ifdef R__USE_IMT