Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport TBB deprecation and related changes to 6-22 #7065

Merged
merged 20 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/imt/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ target_link_libraries(Imt PRIVATE Thread INTERFACE Core)
if(imt)
ROOT_GENERATE_DICTIONARY(G__Imt STAGE1
ROOT/TFuture.hxx
ROOT/TPoolManager.hxx
ROOT/TTaskGroup.hxx
ROOT/RTaskArena.hxx
ROOT/TThreadExecutor.hxx
LINKDEF
LinkDef.h
Expand All @@ -41,8 +41,8 @@ if(imt)

# G__Imt.cxx is automatically added by ROOT_GENERATE_DICTIONARY()
target_sources(Imt PRIVATE
src/RTaskArena.cxx
src/TImplicitMT.cxx
src/TPoolManager.cxx
src/TThreadExecutor.cxx
)

Expand Down
1 change: 0 additions & 1 deletion core/imt/inc/LinkDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// Only for the autoload, autoparse. No IO of these classes is foreseen!
// Exclude in case ROOT does not have IMT support
#ifdef R__USE_IMT
#pragma link C++ class ROOT::Internal::TPoolManager-;
#pragma link C++ class ROOT::TThreadExecutor-;
#pragma link C++ class ROOT::Experimental::TTaskGroup-;
#endif
Expand Down
89 changes: 89 additions & 0 deletions core/imt/inc/ROOT/RTaskArena.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// @(#)root/thread:$Id$
// // Author: Xavier Valls Pla 08/05/20
//
/*************************************************************************
* Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/

//////////////////////////////////////////////////////////////////////////
// //
// RTaskArena //
// //
// This file implements the method to initialize and retrieve ROOT's //
// global task arena, together with a method to check for active //
// CPU bandwith control, and a class to wrap the tbb task arena with //
// the purpose of keeping tbb off the installed headers //
// //
//////////////////////////////////////////////////////////////////////////

#ifndef ROOT_RTaskArena
#define ROOT_RTaskArena

#include "RConfigure.h"
#include <memory>

// exclude in case ROOT does not have IMT support
#ifndef R__USE_IMT
// No need to error out for dictionaries.
# if !defined(__ROOTCLING__) && !defined(G__DICTIONARY)
# error "Cannot use ROOT::Internal::RTaskArenaWrapper if build option imt=OFF."
# endif
#else

/// tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow
/// to forward declare tbb::task_arena without forward declaring tbb::interface7

namespace ROOT {

class ROpaqueTaskArena;

namespace Internal {

////////////////////////////////////////////////////////////////////////////////
/// Returns the available number of logical cores.
///
/// - Checks if there is CFS bandwidth control in place (linux, via cgroups,
/// assuming standard paths)
/// - Otherwise, returns the number of logical cores provided by
/// std::thread::hardware_concurrency()
////////////////////////////////////////////////////////////////////////////////
int LogicalCPUBandwithControl();


////////////////////////////////////////////////////////////////////////////////
/// Wrapper for tbb::task_arena.
///
/// Necessary in order to keep tbb away from ROOT headers.
/// This class is thought out to be used as a singleton.
////////////////////////////////////////////////////////////////////////////////
class RTaskArenaWrapper {
public:
~RTaskArenaWrapper(); // necessary to set size back to zero
static unsigned TaskArenaSize(); // A static getter lets us check for RTaskArenaWrapper's existence
ROOT::ROpaqueTaskArena &Access();
private:
RTaskArenaWrapper(unsigned maxConcurrency = 0);
friend std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency);
std::unique_ptr<ROOT::ROpaqueTaskArena> fTBBArena;
static unsigned fNWorkers;
};


////////////////////////////////////////////////////////////////////////////////
/// Factory function returning a shared pointer to the instance of the global
/// RTaskArenaWrapper.
///
/// Allows for reinstantiation of the global RTaskArenaWrapper once all the
/// references to the previous one are gone and the object destroyed.
////////////////////////////////////////////////////////////////////////////////
std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency = 0);

} // namespace Internal
} // namespace ROOT

#endif // R__USE_IMT
#endif // ROOT_RTaskArena
77 changes: 0 additions & 77 deletions core/imt/inc/ROOT/TPoolManager.hxx

This file was deleted.

6 changes: 3 additions & 3 deletions core/imt/inc/ROOT/TThreadExecutor.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
#else

#include "ROOT/TExecutor.hxx"
#include "ROOT/TPoolManager.hxx"
#include "TROOT.h"
#include "RTaskArena.hxx"
#include "TError.h"
#include <functional>
#include <memory>
#include <numeric>


namespace ROOT {

class TThreadExecutor: public TExecutor<TThreadExecutor> {
Expand Down Expand Up @@ -104,7 +104,7 @@ namespace ROOT {
template<class T, class R>
auto SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));

std::shared_ptr<ROOT::Internal::TPoolManager> fSched = nullptr;
std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> fTaskArenaW = nullptr;
};

/************ TEMPLATE METHODS IMPLEMENTATION ******************/
Expand Down
5 changes: 5 additions & 0 deletions core/imt/src/ROpaqueTaskArena.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "tbb/task_arena.h"

namespace ROOT {
class ROpaqueTaskArena: public tbb::task_arena {};
}
128 changes: 128 additions & 0 deletions core/imt/src/RTaskArena.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#include "ROOT/RTaskArena.hxx"
#include "ROpaqueTaskArena.hxx"
#include "TError.h"
#include "TROOT.h"
#include "TThread.h"
#include <fstream>
#include <mutex>
#include <thread>
#include "tbb/task_arena.h"
#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
#include "tbb/global_control.h"

//////////////////////////////////////////////////////////////////////////
///
/// \class ROOT::Internal::RTaskArenaWrapper
/// \ingroup Parallelism
/// \brief Wrapper over tbb::task_arena
///
/// This class is a wrapper over tbb::task_arena, in order to keep
/// TBB away from ROOT's headers. We keep a single global instance to be
/// used by any parallel ROOT class with TBB as a backend.
///
/// TThreadExecutor, IMT and any class relying on TBB will get a pointer
/// to the scheduler through `ROOT::Internal::GetGlobalTaskArena()`, which
/// will return areference to the only pointer to the TBB scheduler that
/// will be active in any ROOT Process.
///
/// #### Examples:
/// ~~~{.cpp}
/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena(nWorkers) //get a shared_ptr to the global arena and initialize
/// //it with nWorkers. Enable thread safety in ROOT
/// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads)
/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to
/// //call operations such as execute)
/// root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency()
/// ~~~
///
//////////////////////////////////////////////////////////////////////////

namespace ROOT {
namespace Internal {

int LogicalCPUBandwithControl()
{
#ifdef R__LINUX
// Check for CFS bandwith control
std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file
if (f) {
float cfs_quota;
f >> cfs_quota;
f.close();
if (cfs_quota > 0) {
f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file
float cfs_period;
f >> cfs_period;
f.close();
return static_cast<int>(std::ceil(cfs_quota / cfs_period));
}
}
#endif
return std::thread::hardware_concurrency();
}

////////////////////////////////////////////////////////////////////////////////
/// Initializes the tbb::task_arena within RTaskArenaWrapper.
///
/// * Can't be reinitialized
/// * Checks for CPU bandwidth control and avoids oversubscribing
/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads,
/// which is CPU affinity aware
////////////////////////////////////////////////////////////////////////////////
RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new ROpaqueTaskArena{})
{
const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state
maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads;
const unsigned bcCpus = LogicalCPUBandwithControl();
if (maxConcurrency > bcCpus) {
Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus);
maxConcurrency = bcCpus;
}
if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers"
"from this task arena available for execution.");
}
fTBBArena->initialize(maxConcurrency);
fNWorkers = maxConcurrency;
ROOT::EnableThreadSafety();
}

RTaskArenaWrapper::~RTaskArenaWrapper()
{
fNWorkers = 0u;
}

unsigned RTaskArenaWrapper::fNWorkers = 0u;

unsigned RTaskArenaWrapper::TaskArenaSize()
{
return fNWorkers;
}
////////////////////////////////////////////////////////////////////////////////
/// Provides access to the wrapped tbb::task_arena.
////////////////////////////////////////////////////////////////////////////////
ROOT::ROpaqueTaskArena &RTaskArenaWrapper::Access()
{
return *fTBBArena;
}

std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency)
{
static std::weak_ptr<ROOT::Internal::RTaskArenaWrapper> weak_GTAWrapper;

static std::mutex m;
const std::lock_guard<std::mutex> lock{m};
if (auto sp = weak_GTAWrapper.lock()) {
if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) {
Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads",
sp->TaskArenaSize());
}
return sp;
}
std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> sp(new ROOT::Internal::RTaskArenaWrapper(maxConcurrency));
weak_GTAWrapper = sp;
return sp;
}

} // namespace Internal
} // namespace ROOT
Loading