Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

warn on tbb::global_control interferences #6814

Merged
merged 2 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/imt/src/RTaskArena.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <mutex>
#include <thread>
#include "tbb/task_arena.h"
#include "tbb/global_control.h"

//////////////////////////////////////////////////////////////////////////
///
Expand Down Expand Up @@ -75,6 +76,10 @@ RTaskArenaWrapper::RTaskArenaWrapper(unsigned maxConcurrency) : fTBBArena(new tb
Warning("RTaskArenaWrapper", "CPU Bandwith Control Active. Proceeding with %d threads accordingly", bcCpus);
maxConcurrency = bcCpus;
}
if (maxConcurrency > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
Warning("RTaskArenaWrapper", "tbb::global_control is active, limiting the number of parallel workers"
"from this task arena available for execution.");
}
fTBBArena->initialize(maxConcurrency);
fNWorkers = maxConcurrency;
ROOT::EnableThreadSafety();
Expand Down
18 changes: 18 additions & 0 deletions core/imt/src/TThreadExecutor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ TThreadExecutor::TThreadExecutor(UInt_t nThreads)
void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step,
const std::function<void(unsigned int i)> &f)
{
if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
Warning("TThreadExecutor::ParallelFor",
"tbb::global_control is limiting the number of parallel workers."
" Proceeding with %zu threads this time",
tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
}
fTaskArenaW->Access().execute([&] {
tbb::this_task_arena::isolate([&] {
tbb::parallel_for(start, end, step, f);
Expand All @@ -171,6 +177,12 @@ void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned
double TThreadExecutor::ParallelReduce(const std::vector<double> &objs,
const std::function<double(double a, double b)> &redfunc)
{
if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
Warning("TThreadExecutor::ParallelReduce",
"tbb::global_control is limiting the number of parallel workers."
" Proceeding with %zu threads this time",
tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
}
return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<double>(objs, redfunc); });
}

Expand All @@ -183,6 +195,12 @@ double TThreadExecutor::ParallelReduce(const std::vector<double> &objs,
float TThreadExecutor::ParallelReduce(const std::vector<float> &objs,
const std::function<float(float a, float b)> &redfunc)
{
if (GetPoolSize() > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)) {
Warning("TThreadExecutor::ParallelReduce",
"tbb::global_control is limiting the number of parallel workers."
" Proceeding with %zu threads this time",
tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic repeated 3 times could be factored out in a little helper function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these three times in such a short file, I feel creating a helper to print would be overkill, and creating a helper that includes the call to GetPoolSize() would complicate the code.

return fTaskArenaW->Access().execute([&] { return ROOT::Internal::ParallelReduceHelper<float>(objs, redfunc); });
}

Expand Down
2 changes: 1 addition & 1 deletion core/imt/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# For the licensing terms see $ROOTSYS/LICENSE.
# For the list of contributors see $ROOTSYS/README/CREDITS.

ROOT_ADD_GTEST(testImt testRTaskArena.cxx testTFuture.cxx testTTaskGroup.cxx LIBRARIES Imt ${TBB_LIBRARIES})
ROOT_ADD_GTEST(testImt testRTaskArena.cxx testTBBGlobalControl.cxx testTFuture.cxx testTTaskGroup.cxx LIBRARIES Imt ${TBB_LIBRARIES})
55 changes: 55 additions & 0 deletions core/imt/test/testTBBGlobalControl.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "TROOT.h"
#include "ROOT/RTaskArena.hxx"
#include "ROOT/TThreadExecutor.hxx"
#include "ROOTUnitTestSupport.h"
#include "gtest/gtest.h"
#include "tbb/global_control.h"

#ifdef R__USE_IMT

const unsigned maxConcurrency = ROOT::Internal::LogicalCPUBandwithControl();

TEST(TBBGlobalControl, RTaskArena)
{
if (maxConcurrency <= 1)
return; // skip this test on systems with only 1 core
auto gTAInstance = ROOT::Internal::GetGlobalTaskArena(maxConcurrency);
tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1u);
gTAInstance.reset();
ROOT_EXPECT_WARNING(gTAInstance = ROOT::Internal::GetGlobalTaskArena(maxConcurrency);,
"RTaskArenaWrapper",
"tbb::global_control is active, limiting the number of parallel workers"
"from this task arena available for execution.");
}

TEST(TBBGlobalControl, TThreadExecutor)
{
// ***See them pass***
ROOT::TThreadExecutor executor{maxConcurrency};
executor.Map([]() { return 1; }, 10); // ParallelFor
std::vector<double> vd{0., 1., 2.};
executor.Reduce(vd, std::plus<double>{}); // ParallelReduce double
std::vector<float> vf{0., 1., 2.};
executor.Reduce(vf, std::plus<float>{}); // ParallelReduce float

tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1u);

if (maxConcurrency <= 1)
return; // skip this test on systems with only 1 core
// ***See them warn***
ROOT_EXPECT_WARNING(executor.Map([]() { return 1; }, 10), "TThreadExecutor::ParallelFor",
"tbb::global_control is limiting the number of parallel workers."
" Proceeding with 1 threads this time");

// ParallelReduce double
ROOT_EXPECT_WARNING(executor.Reduce(vd, std::plus<double>{}), "TThreadExecutor::ParallelReduce",
"tbb::global_control is limiting the number of parallel workers."
" Proceeding with 1 threads this time");

// ParallelReduce float
ROOT_EXPECT_WARNING(executor.Reduce(vf, std::plus<float>{}), "TThreadExecutor::ParallelReduce",
"tbb::global_control is limiting the number of parallel workers."
" Proceeding with 1 threads this time");
}

#endif