Skip to content

Commit

Permalink
[DF] Reduce duration of concurrency unit test
Browse files Browse the repository at this point in the history
by about two orders of magnitude (150 s --> 1s) by using a
reasonable number of threads throughout the battery of tests
(in some cases the machine could be heavily overcommitted) and
by eliminating the RDF jitting, which happens sequentially.
  • Loading branch information
dpiparo committed Sep 11, 2023
1 parent d606722 commit d4f699a
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions tree/dataframe/test/dataframe_concurrency.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ TEST(RDFConcurrency, NestedParallelismBetweenDefineCalls)

// this lambda will be used to introduce nested parallelism via a dummy Filter
auto manysleeps = [&] {
ROOT::TThreadExecutor().Foreach(
ROOT::TThreadExecutor(NUM_THREADS).Foreach(
[] { std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() / RAND_MAX * 200)); }, 8);
return true;
};
Expand Down Expand Up @@ -93,13 +93,13 @@ void SimpleParallelRDFs()
{
// Run the RDF construction and the event loop in parallel
auto func = [] {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() / RAND_MAX * 200));
ROOT::RDataFrame df(10);
return df.Define("x", "rdfentry_").Mean("x").GetValue();
};
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() / RAND_MAX * 200));
ROOT::RDataFrame df(10);
return df.Define("x", [](ULong64_t ievt) { return ievt; }, {"rdfentry_"}).Mean<ULong64_t>("x").GetValue();
};

ROOT::TThreadExecutor pool;
auto res = pool.Map(func, 64);
ROOT::TThreadExecutor pool(NUM_THREADS);
auto res = pool.Map(func, NUM_THREADS);

const auto ref = func();

Expand Down Expand Up @@ -127,14 +127,14 @@ void SimpleParallelRDFLoops()
auto create_df = [] {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() / RAND_MAX * 200));
ROOT::RDataFrame df(10);
return df.Define("x", "rdfentry_").Mean("x");
return df.Define("x", [](ULong64_t ievt) {return ievt;}, {"rdfentry_"}).Mean<ULong64_t>("x");
};

std::vector<ROOT::RDF::RResultPtr<double>> vals(64);
for (auto i = 0u; i < vals.size(); i++)
vals[i] = create_df();

ROOT::TThreadExecutor pool;
ROOT::TThreadExecutor pool(NUM_THREADS);
auto func = [](ROOT::RDF::RResultPtr<double> rptr){ return rptr.GetValue(); };
auto res = pool.Map(func, vals);

Expand Down Expand Up @@ -162,13 +162,14 @@ void ParallelRDFSnapshots()
{
// Run the RDF construction and the event loop in parallel
const auto nevts = 100u;
auto func = [&] (int i) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() / RAND_MAX * 200));
ROOT::RDataFrame df(nevts);
df.Define("x", "(int)rdfentry_").Snapshot("tree", "dataframe_parallel_snapshots_" + std::to_string(i) + ".root");
};
auto func = [&](int i) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() / RAND_MAX * 200));
ROOT::RDataFrame df(nevts);
df.Define("x", [](ULong64_t ievt) { return int(ievt); }, {"rdfentry_"})
.Snapshot<int>("tree", "dataframe_parallel_snapshots_" + std::to_string(i) + ".root", {"x"});
};

ROOT::TThreadExecutor pool;
ROOT::TThreadExecutor pool(NUM_THREADS);
std::vector<int> vals = {0, 1, 2, 3, 4, 5, 6, 7};
pool.Foreach(func, vals);

Expand Down Expand Up @@ -207,11 +208,11 @@ void ParallelRDFCaches()
auto func = [&] {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() / RAND_MAX * 200));
ROOT::RDataFrame df(nevts);
auto cache = df.Define("x", "(int)rdfentry_").Cache("x");
return cache.Sum("x").GetValue();
auto cache = df.Define("x", [](ULong64_t ievt) {return (int)ievt;}, {"rdfentry_"}).Cache<int>({"x"});
return cache.Sum<int>("x").GetValue();
};

ROOT::TThreadExecutor pool;
ROOT::TThreadExecutor pool(NUM_THREADS);
auto res = pool.Map(func, 8);

auto sum_ref = 0u;
Expand Down

0 comments on commit d4f699a

Please sign in to comment.