-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathRTaskArena.cxx
128 lines (118 loc) · 4.83 KB
/
RTaskArena.cxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "ROOT/RTaskArena.hxx"
#include "ROpaqueTaskArena.hxx"
#include "TError.h"
#include "TROOT.h"
#include "TThread.h"
#include <fstream>
#include <mutex>
#include <thread>
#include "tbb/task_arena.h"
#define TBB_PREVIEW_GLOBAL_CONTROL 1 // required for TBB versions preceding 2019_U4
#include "tbb/global_control.h"
//////////////////////////////////////////////////////////////////////////
///
/// \class ROOT::Internal::RTaskArenaWrapper
/// \ingroup Parallelism
/// \brief Wrapper over tbb::task_arena
///
/// This class is a wrapper over tbb::task_arena, in order to keep
/// TBB away from ROOT's headers. We keep a single global instance to be
/// used by any parallel ROOT class with TBB as a backend.
///
/// TThreadExecutor, IMT and any class relying on TBB will get a pointer
/// to the scheduler through `ROOT::Internal::GetGlobalTaskArena()`, which
/// will return areference to the only pointer to the TBB scheduler that
/// will be active in any ROOT Process.
///
/// #### Examples:
/// ~~~{.cpp}
/// root[] auto gTA = ROOT::Internal::GetGlobalTaskArena(nWorkers) //get a shared_ptr to the global arena and initialize
/// //it with nWorkers. Enable thread safety in ROOT
/// root[] gTA->TaskArenaSize() // Get the current size of the arena (number of worker threads)
/// root[] gTA->Access() //std::unique_ptr to the internal tbb::task_arena for interacting directly with it (needed to
/// //call operations such as execute)
/// root[] gTA->Access().max_concurrency() // call to tbb::task_arena::max_concurrency()
/// ~~~
///
//////////////////////////////////////////////////////////////////////////
namespace ROOT {
namespace Internal {
int LogicalCPUBandwithControl()
{
#ifdef R__LINUX
// Check for CFS bandwith control
std::ifstream f("/sys/fs/cgroup/cpuacct/cpu.cfs_quota_us"); // quota file
if (f) {
float cfs_quota;
f >> cfs_quota;
f.close();
if (cfs_quota > 0) {
f.open("/sys/fs/cgroup/cpuacct/cpu.cfs_period_us"); // period file
float cfs_period;
f >> cfs_period;
f.close();
return static_cast<int>(std::ceil(cfs_quota / cfs_period));
}
}
#endif
return std::thread::hardware_concurrency();
}
////////////////////////////////////////////////////////////////////////////////
/// Initializes the tbb::task_arena within RTaskArenaWrapper.
///
/// * Can't be reinitialized
/// * Checks for CPU bandwidth control and avoids oversubscribing
/// * If no BC in place and maxConcurrency<1, defaults to the default tbb number of threads,
/// which is CPU affinity aware
////////////////////////////////////////////////////////////////////////////////
RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new ROpaqueTaskArena{})
{
const unsigned tbbDefaultNumberThreads = fTBBArena->max_concurrency(); // not initialized, automatic state
maxConcurrency = maxConcurrency > 0 ? std::min(maxConcurrency, tbbDefaultNumberThreads) : tbbDefaultNumberThreads;
const unsigned bcCpus = LogicalCPUBandwithControl();
if (maxConcurrency > bcCpus) {
Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus);
maxConcurrency = bcCpus;
}
if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers"
"from this task arena available for execution.");
}
fTBBArena->initialize(maxConcurrency);
fNWorkers = maxConcurrency;
ROOT::EnableThreadSafety();
}
RTaskArenaWrapper::~RTaskArenaWrapper()
{
fNWorkers = 0u;
}
unsigned RTaskArenaWrapper::fNWorkers = 0u;
unsigned RTaskArenaWrapper::TaskArenaSize()
{
return fNWorkers;
}
////////////////////////////////////////////////////////////////////////////////
/// Provides access to the wrapped tbb::task_arena.
////////////////////////////////////////////////////////////////////////////////
ROOT::ROpaqueTaskArena &RTaskArenaWrapper::Access()
{
return *fTBBArena;
}
std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> GetGlobalTaskArena(unsigned maxConcurrency)
{
static std::weak_ptr<ROOT::Internal::RTaskArenaWrapper> weak_GTAWrapper;
static std::mutex m;
const std::lock_guard<std::mutex> lock{m};
if (auto sp = weak_GTAWrapper.lock()) {
if (maxConcurrency && (sp->TaskArenaSize() != maxConcurrency)) {
Warning("RTaskArenaWrapper", "There's already an active task arena. Proceeding with the current %d threads",
sp->TaskArenaSize());
}
return sp;
}
std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> sp(new ROOT::Internal::RTaskArenaWrapper(maxConcurrency));
weak_GTAWrapper = sp;
return sp;
}
} // namespace Internal
} // namespace ROOT