diff --git a/FWCore/Concurrency/interface/FinalWaitingTask.h b/FWCore/Concurrency/interface/FinalWaitingTask.h new file mode 100644 index 0000000000000..2339ec1ebef3f --- /dev/null +++ b/FWCore/Concurrency/interface/FinalWaitingTask.h @@ -0,0 +1,60 @@ +#ifndef FWCore_Concurrency_FinalWaitingTask_h +#define FWCore_Concurrency_FinalWaitingTask_h +// -*- C++ -*- +// +// Package: FWCore/Concurrency +// Class : FinalWaitingTask +// +/**\class FinalWaitingTask FinalWaitingTask.h "FWCore/Concurrency/interface/FinalWaitingTask.h" + + Description: [one line class summary] + + Usage: + Use this class on the stack to signal the final task to be run. + Call done() to check to see if the task was run and check value of + exceptionPtr() to see if an exception was thrown by any task in the group. + +*/ +// +// Original Author: Christopher Jones +// Created: Tue, 12 Jul 2022 18:45:15 GMT +// + +// system include files +#include "oneapi/tbb/task_group.h" + +// user include files +#include "FWCore/Concurrency/interface/WaitingTask.h" + +// forward declarations +namespace edm { + class FinalWaitingTask : public WaitingTask { + public: + FinalWaitingTask() = delete; + explicit FinalWaitingTask(tbb::task_group& iGroup) + : m_group{&iGroup}, m_handle{iGroup.defer([]() {})}, m_done{false} {} + + void execute() final { m_done = true; } + + [[nodiscard]] bool done() const noexcept { return m_done.load(); } + + void wait() { + m_group->wait(); + if (exceptionPtr()) { + std::rethrow_exception(exceptionPtr()); + } + } + std::exception_ptr waitNoThrow() { + m_group->wait(); + return exceptionPtr(); + } + + private: + void recycle() final { m_group->run(std::move(m_handle)); } + tbb::task_group* m_group; + tbb::task_handle m_handle; + std::atomic m_done; + }; + +} // namespace edm +#endif diff --git a/FWCore/Concurrency/interface/WaitingTask.h b/FWCore/Concurrency/interface/WaitingTask.h index 468890091e892..d4c75cb707345 100644 --- a/FWCore/Concurrency/interface/WaitingTask.h +++ b/FWCore/Concurrency/interface/WaitingTask.h @@ -68,23 +68,6 @@ namespace edm { std::atomic m_ptrSet = false; }; - /** Use this class on the stack to signal the final task to be run. - Call done() to check to see if the task was run and check value of - exceptionPtr() to see if an exception was thrown by any task in the group. - */ - class FinalWaitingTask : public WaitingTask { - public: - FinalWaitingTask() : m_done{false} {} - - void execute() final { m_done = true; } - - bool done() const { return m_done.load(); } - - private: - void recycle() final {} - std::atomic m_done; - }; - template class FunctorWaitingTask : public WaitingTask { public: diff --git a/FWCore/Concurrency/test/test_catch2_WaitingTaskChain.cc b/FWCore/Concurrency/test/test_catch2_WaitingTaskChain.cc index 3070b841aaf5e..9dd0ee0e75af9 100644 --- a/FWCore/Concurrency/test/test_catch2_WaitingTaskChain.cc +++ b/FWCore/Concurrency/test/test_catch2_WaitingTaskChain.cc @@ -11,6 +11,7 @@ #include "oneapi/tbb/global_control.h" #include "FWCore/Concurrency/interface/chain_first.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" TEST_CASE("Test chain::first", "[chain::first]") { oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 1); @@ -19,8 +20,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](edm::WaitingTaskHolder h) { @@ -31,7 +32,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 1); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -40,8 +41,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | then | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](auto h) { @@ -56,7 +57,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 2); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -65,8 +66,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | then | then | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](auto h) { @@ -85,7 +86,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 3); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -94,8 +95,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | then | then | runLast") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; first([&count](auto h) { @@ -109,7 +110,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { REQUIRE(count.load() == 3); }) | runLast(edm::WaitingTaskHolder(group, &waitTask)); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 3); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -118,8 +119,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("exception -> first | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](edm::WaitingTaskHolder h) { @@ -130,7 +131,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::make_exception_ptr(std::exception())); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 0); REQUIRE(waitTask.done()); REQUIRE(waitTask.exceptionPtr()); @@ -139,8 +140,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first(exception) | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](edm::WaitingTaskHolder h) { @@ -152,7 +153,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 1); REQUIRE(waitTask.done()); REQUIRE(waitTask.exceptionPtr()); @@ -161,8 +162,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first(exception) | then | then | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](auto h) { @@ -182,7 +183,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 1); REQUIRE(waitTask.done()); REQUIRE(waitTask.exceptionPtr()); @@ -193,8 +194,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](std::exception_ptr const* iPtr, edm::WaitingTaskHolder h) { @@ -206,7 +207,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 1); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -215,8 +216,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | then | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](std::exception_ptr const* iPtr, auto h) { @@ -233,7 +234,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 2); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -242,8 +243,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | then | then | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](std::exception_ptr const* iPtr, auto h) { @@ -265,7 +266,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 3); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -274,8 +275,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("exception -> first | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](std::exception_ptr const* iPtr, edm::WaitingTaskHolder h) { @@ -287,7 +288,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::make_exception_ptr(std::exception())); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 1); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -296,8 +297,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("exception -> first | then | lastTask") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first([&count](std::exception_ptr const* iPtr, edm::WaitingTaskHolder h) { @@ -315,7 +316,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::make_exception_ptr(std::exception())); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 2); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -327,8 +328,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { std::atomic count{0}; std::atomic exceptCount{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first(ifException([&exceptCount](std::exception_ptr const& iPtr) { @@ -342,7 +343,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(exceptCount.load() == 0); REQUIRE(count.load() == 1); REQUIRE(waitTask.done()); @@ -353,8 +354,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { std::atomic count{0}; std::atomic exceptCount{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first(ifException([&exceptCount](std::exception_ptr const& iPtr) { @@ -375,7 +376,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(exceptCount.load() == 0); REQUIRE(count.load() == 2); REQUIRE(waitTask.done()); @@ -386,8 +387,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { std::atomic count{0}; std::atomic exceptCount{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first(ifException([&exceptCount](std::exception_ptr const& iPtr) { @@ -415,7 +416,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::exception_ptr()); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(exceptCount.load() == 0); REQUIRE(count.load() == 3); REQUIRE(waitTask.done()); @@ -426,8 +427,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { std::atomic count{0}; std::atomic exceptCount{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; auto h = first(ifException([&exceptCount](std::exception_ptr const& iPtr) { @@ -455,7 +456,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { h.doneWaiting(std::make_exception_ptr(std::exception())); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(exceptCount.load() == 3); REQUIRE(count.load() == 0); REQUIRE(waitTask.done()); @@ -467,8 +468,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | ifThen(true) | then | runLast") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; first([&count](auto h) { @@ -482,7 +483,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { REQUIRE(count.load() == 3); }) | runLast(edm::WaitingTaskHolder(group, &waitTask)); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 3); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); @@ -491,8 +492,8 @@ TEST_CASE("Test chain::first", "[chain::first]") { SECTION("first | ifThen(false) | then | runLast") { std::atomic count{0}; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { using namespace edm::waiting_task::chain; first([&count](auto h) { @@ -506,7 +507,7 @@ TEST_CASE("Test chain::first", "[chain::first]") { REQUIRE(count.load() == 2); }) | runLast(edm::WaitingTaskHolder(group, &waitTask)); } - group.wait(); + waitTask.waitNoThrow(); REQUIRE(count.load() == 2); REQUIRE(waitTask.done()); REQUIRE(not waitTask.exceptionPtr()); diff --git a/FWCore/Concurrency/test/waitingtasklist_t.cppunit.cpp b/FWCore/Concurrency/test/waitingtasklist_t.cppunit.cpp index 0de294b0acdf7..379ec7d3b3f27 100644 --- a/FWCore/Concurrency/test/waitingtasklist_t.cppunit.cpp +++ b/FWCore/Concurrency/test/waitingtasklist_t.cppunit.cpp @@ -14,6 +14,7 @@ #include #include "oneapi/tbb/task.h" #include "FWCore/Concurrency/interface/WaitingTaskList.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" class WaitingTaskList_test : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(WaitingTaskList_test); @@ -185,7 +186,7 @@ void WaitingTaskList_test::stressTest() { unsigned int index = 1000; const unsigned int nTasks = 10000; while (0 != --index) { - edm::FinalWaitingTask waitTask; + edm::FinalWaitingTask waitTask{group}; auto* pWaitTask = &waitTask; { edm::WaitingTaskHolder waitTaskH(group, pWaitTask); @@ -199,9 +200,7 @@ void WaitingTaskList_test::stressTest() { std::thread doneWaitThread([&waitList, waitTaskH] { waitList.doneWaiting(std::exception_ptr{}); }); std::shared_ptr(&doneWaitThread, join_thread); } - do { - group.wait(); - } while (not waitTask.done()); + waitTask.wait(); } } diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index a069b62080bf7..1d1a616824a5f 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -56,6 +56,7 @@ #include "FWCore/ServiceRegistry/interface/SystemBounds.h" #include "FWCore/Concurrency/interface/WaitingTask.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" #include "FWCore/Concurrency/interface/chain_first.h" @@ -593,9 +594,9 @@ namespace edm { } void EventProcessor::taskCleanup() { - edm::FinalWaitingTask task; + edm::FinalWaitingTask task{taskGroup_}; espController_->endIOVsAsync(edm::WaitingTaskHolder{taskGroup_, &task}); - taskGroup_.wait(); + task.waitNoThrow(); assert(task.done()); } @@ -708,8 +709,8 @@ namespace edm { for_all(subProcesses_, [](auto& subProcess) { subProcess.doBeginJob(); }); actReg_->postBeginJobSignal_(); - FinalWaitingTask last; oneapi::tbb::task_group group; + FinalWaitingTask last{group}; using namespace edm::waiting_task::chain; first([this](auto nextTask) { for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) { @@ -722,10 +723,7 @@ namespace edm { }) | lastTask(nextTask); } }) | runLast(WaitingTaskHolder(group, &last)); - group.wait(); - if (last.exceptionPtr()) { - std::rethrow_exception(last.exceptionPtr()); - } + last.wait(); } void EventProcessor::endJob() { @@ -738,8 +736,8 @@ namespace edm { using namespace edm::waiting_task::chain; - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; { //handle endStream transitions @@ -777,7 +775,7 @@ namespace edm { }) | lastTask(taskHolder); } } - group.wait(); + waitTask.waitNoThrow(); auto actReg = actReg_.get(); c.call([actReg]() { actReg->preEndJobSignal_(); }); @@ -1044,19 +1042,13 @@ namespace edm { processBlockPrincipal.fillProcessBlockPrincipal(processConfiguration_->processName()); using Traits = OccurrenceTraits; - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal); beginGlobalTransitionAsync( WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - - if (globalWaitTask.exceptionPtr()) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); beginProcessBlockSucceeded = true; } @@ -1067,27 +1059,17 @@ namespace edm { readProcessBlock(processBlockPrincipal); using Traits = OccurrenceTraits; - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal); beginGlobalTransitionAsync( WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, subProcesses_); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); - FinalWaitingTask writeWaitTask; + FinalWaitingTask writeWaitTask{taskGroup_}; writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::Input); - do { - taskGroup_.wait(); - } while (not writeWaitTask.done()); - if (writeWaitTask.exceptionPtr()) { - std::rethrow_exception(writeWaitTask.exceptionPtr()); - } + writeWaitTask.wait(); processBlockPrincipal.clearPrincipal(); for (auto& s : subProcesses_) { @@ -1100,7 +1082,7 @@ namespace edm { ProcessBlockPrincipal& processBlockPrincipal = principalCache_.processBlockPrincipal(); using Traits = OccurrenceTraits; - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal); endGlobalTransitionAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask), @@ -1109,22 +1091,12 @@ namespace edm { serviceToken_, subProcesses_, cleaningUpAfterException); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr()) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); if (beginProcessBlockSucceeded) { - FinalWaitingTask writeWaitTask; + FinalWaitingTask writeWaitTask{taskGroup_}; writeProcessBlockAsync(edm::WaitingTaskHolder{taskGroup_, &writeWaitTask}, ProcessBlockType::New); - do { - taskGroup_.wait(); - } while (not writeWaitTask.done()); - if (writeWaitTask.exceptionPtr()) { - std::rethrow_exception(writeWaitTask.exceptionPtr()); - } + writeWaitTask.wait(); } processBlockPrincipal.clearPrincipal(); @@ -1162,7 +1134,7 @@ namespace edm { if (looper_ && looperBeginJobRun_ == false) { looper_->copyInfo(ScheduleInfo(schedule_.get())); - FinalWaitingTask waitTask; + FinalWaitingTask waitTask{taskGroup_}; using namespace edm::waiting_task::chain; chain::first([this, &es](auto nextTask) { looper_->esPrefetchAsync(nextTask, es, Transition::BeginRun, serviceToken_); @@ -1172,16 +1144,11 @@ namespace edm { looper_->doStartingNewLoop(); }) | runLast(WaitingTaskHolder(taskGroup_, &waitTask)); - do { - taskGroup_.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); } { using Traits = OccurrenceTraits; - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; using namespace edm::waiting_task::chain; chain::first([&runPrincipal, &es, this](auto waitTask) { @@ -1197,16 +1164,11 @@ namespace edm { looper_->doBeginRun(runPrincipal, es, &processContext_); }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask)); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr()) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } { //To wait, the ref count has to be 1+#streams - FinalWaitingTask streamLoopWaitTask; + FinalWaitingTask streamLoopWaitTask{taskGroup_}; using Traits = OccurrenceTraits; @@ -1217,12 +1179,7 @@ namespace edm { transitionInfo, serviceToken_, subProcesses_); - do { - taskGroup_.wait(); - } while (not streamLoopWaitTask.done()); - if (streamLoopWaitTask.exceptionPtr()) { - std::rethrow_exception(streamLoopWaitTask.exceptionPtr()); - } + streamLoopWaitTask.wait(); } FDEBUG(1) << "\tstreamBeginRun " << run << "\n"; if (looper_) { @@ -1242,16 +1199,14 @@ namespace edm { if (globalBeginSucceeded) { RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run); if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) { - FinalWaitingTask t; + FinalWaitingTask t{taskGroup_}; MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata(); mergeableRunProductMetadata->preWriteRun(); writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata); - do { - taskGroup_.wait(); - } while (not t.done()); + auto exceptn = t.waitNoThrow(); mergeableRunProductMetadata->postWriteRun(); - if (t.exceptionPtr()) { - std::rethrow_exception(t.exceptionPtr()); + if (exceptn) { + std::rethrow_exception(exceptn); } } } @@ -1279,7 +1234,7 @@ namespace edm { auto const& es = esp_->eventSetupImpl(); if (globalBeginSucceeded) { //To wait, the ref count has to be 1+#streams - FinalWaitingTask streamLoopWaitTask; + FinalWaitingTask streamLoopWaitTask{taskGroup_}; using Traits = OccurrenceTraits; @@ -1291,19 +1246,14 @@ namespace edm { serviceToken_, subProcesses_, cleaningUpAfterException); - do { - taskGroup_.wait(); - } while (not streamLoopWaitTask.done()); - if (streamLoopWaitTask.exceptionPtr()) { - std::rethrow_exception(streamLoopWaitTask.exceptionPtr()); - } + streamLoopWaitTask.wait(); } FDEBUG(1) << "\tstreamEndRun " << run << "\n"; if (looper_) { //looper_->doStreamEndRun(schedule_->streamID(),runPrincipal, es); } { - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; using namespace edm::waiting_task::chain; chain::first([this, &runPrincipal, &es, cleaningUpAfterException](auto nextTask) { @@ -1317,18 +1267,13 @@ namespace edm { looper_->doEndRun(runPrincipal, es, &processContext_); }) | runLast(WaitingTaskHolder(taskGroup_, &globalWaitTask)); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr()) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } FDEBUG(1) << "\tendRun " << run << "\n"; } InputSource::ItemType EventProcessor::processLumis(std::shared_ptr const& iRunResource) { - FinalWaitingTask waitTask; + FinalWaitingTask waitTask{taskGroup_}; if (streamLumiActive_ > 0) { assert(streamLumiActive_ == preallocations_.numberOfStreams()); // Continue after opening a new input file @@ -1339,13 +1284,7 @@ namespace edm { iRunResource, WaitingTaskHolder{taskGroup_, &waitTask}); } - do { - taskGroup_.wait(); - } while (not waitTask.done()); - - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); return lastTransitionType(); } @@ -1660,7 +1599,7 @@ namespace edm { void EventProcessor::endUnfinishedLumi() { if (streamLumiActive_.load() > 0) { - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; { WaitingTaskHolder globalTaskHolder{taskGroup_, &globalWaitTask}; for (unsigned int i = 0; i < preallocations_.numberOfStreams(); ++i) { @@ -1669,12 +1608,7 @@ namespace edm { } } } - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr()) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } } diff --git a/FWCore/Framework/src/EventSetupsController.cc b/FWCore/Framework/src/EventSetupsController.cc index bab9305a3d025..dd1ecdca3f4ef 100644 --- a/FWCore/Framework/src/EventSetupsController.cc +++ b/FWCore/Framework/src/EventSetupsController.cc @@ -14,6 +14,7 @@ #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" #include "FWCore/Concurrency/interface/WaitingTaskList.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Framework/interface/DataKey.h" #include "FWCore/Framework/interface/DataProxy.h" #include "FWCore/Framework/src/EventSetupProviderMaker.h" @@ -412,7 +413,7 @@ namespace edm { void synchronousEventSetupForInstance(IOVSyncValue const& syncValue, oneapi::tbb::task_group& iGroup, eventsetup::EventSetupsController& espController) { - FinalWaitingTask waitUntilIOVInitializationCompletes; + FinalWaitingTask waitUntilIOVInitializationCompletes{iGroup}; // These do nothing ... WaitingTaskList dummyWaitingTaskList; @@ -431,13 +432,7 @@ namespace edm { waitingTaskHolder.doneWaiting(std::current_exception()); } } - do { - iGroup.wait(); - } while (not waitUntilIOVInitializationCompletes.done()); - - if (waitUntilIOVInitializationCompletes.exceptionPtr() != nullptr) { - std::rethrow_exception(waitUntilIOVInitializationCompletes.exceptionPtr()); - } + waitUntilIOVInitializationCompletes.wait(); } } // namespace eventsetup } // namespace edm diff --git a/FWCore/Framework/src/SynchronousEventSetupsController.cc b/FWCore/Framework/src/SynchronousEventSetupsController.cc index f2b4d53904a26..4f8aa59ae6c3a 100644 --- a/FWCore/Framework/src/SynchronousEventSetupsController.cc +++ b/FWCore/Framework/src/SynchronousEventSetupsController.cc @@ -13,6 +13,7 @@ #include "FWCore/Framework/src/SynchronousEventSetupsController.h" #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include #include @@ -25,11 +26,9 @@ namespace edm { : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1) {} SynchronousEventSetupsController::~SynchronousEventSetupsController() { - FinalWaitingTask finalTask; + FinalWaitingTask finalTask{taskGroup_}; controller_.endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &finalTask)); - do { - taskGroup_.wait(); - } while (not finalTask.done()); + finalTask.waitNoThrow(); } std::shared_ptr SynchronousEventSetupsController::makeProvider( diff --git a/FWCore/Framework/test/callback_t.cppunit.cc b/FWCore/Framework/test/callback_t.cppunit.cc index f11b9528945ac..b606126283624 100644 --- a/FWCore/Framework/test/callback_t.cppunit.cc +++ b/FWCore/Framework/test/callback_t.cppunit.cc @@ -13,6 +13,7 @@ #include "FWCore/Framework/interface/ComponentDescription.h" #include "FWCore/Concurrency/interface/ThreadsController.h" #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Framework/interface/eventsetuprecord_registration_macro.h" @@ -108,13 +109,11 @@ namespace { edm::ActivityRegistry ar; edm::eventsetup::EventSetupRecordImpl rec(edm::eventsetup::EventSetupRecordKey::makeKey(), &ar); - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iCallback.prefetchAsync(edm::WaitingTaskHolder(group, &task), &rec, nullptr, token, edm::ESParentContext{}); - do { - group.wait(); - } while (not task.done()); + task.waitNoThrow(); } } // namespace diff --git a/FWCore/Framework/test/dependentrecord_t.cppunit.cc b/FWCore/Framework/test/dependentrecord_t.cppunit.cc index 0e8a855282a96..466c8efd93bcf 100644 --- a/FWCore/Framework/test/dependentrecord_t.cppunit.cc +++ b/FWCore/Framework/test/dependentrecord_t.cppunit.cc @@ -32,6 +32,7 @@ #include "FWCore/ServiceRegistry/interface/ESParentContext.h" #include "FWCore/Utilities/interface/propagate_const.h" #include "FWCore/Concurrency/interface/ThreadsController.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "cppunit/extensions/HelperMacros.h" @@ -758,17 +759,12 @@ namespace { for (size_t i = 0; i != proxies.size(); ++i) { auto rec = iImpl.findImpl(recs[i]); if (rec) { - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; edm::ServiceToken token; rec->prefetchAsync( edm::WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, token, edm::ESParentContext{}); - do { - group.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); } } } @@ -787,17 +783,12 @@ namespace { for (size_t i = 0; i != proxies.size(); ++i) { auto rec = iImpl.findImpl(recs[i]); if (rec) { - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; edm::ServiceToken token; rec->prefetchAsync( edm::WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, token, edm::ESParentContext{}); - do { - group.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); } } } diff --git a/FWCore/Framework/test/esproducer_t.cppunit.cc b/FWCore/Framework/test/esproducer_t.cppunit.cc index 37cc66145d079..38135314be8e9 100644 --- a/FWCore/Framework/test/esproducer_t.cppunit.cc +++ b/FWCore/Framework/test/esproducer_t.cppunit.cc @@ -29,6 +29,7 @@ #include "FWCore/Utilities/interface/do_nothing_deleter.h" #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Concurrency/interface/ThreadsController.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include #include @@ -56,16 +57,11 @@ namespace { for (size_t i = 0; i != proxies.size(); ++i) { auto rec = iImpl.findImpl(recs[i]); if (rec) { - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; rec->prefetchAsync( edm::WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, edm::ServiceToken{}, edm::ESParentContext{}); - do { - group.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); } } } diff --git a/FWCore/Framework/test/eventsetup_t.cppunit.cc b/FWCore/Framework/test/eventsetup_t.cppunit.cc index 66edfc9a06703..f016dd5386bfa 100644 --- a/FWCore/Framework/test/eventsetup_t.cppunit.cc +++ b/FWCore/Framework/test/eventsetup_t.cppunit.cc @@ -44,6 +44,7 @@ #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/ESProductTag.h" #include "FWCore/Concurrency/interface/ThreadsController.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "cppunit/extensions/HelperMacros.h" @@ -424,16 +425,11 @@ namespace { for (size_t i = 0; i != proxies.size(); ++i) { auto rec = iImpl.findImpl(recs[i]); if (rec) { - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; rec->prefetchAsync( WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, edm::ServiceToken{}, edm::ESParentContext{}); - do { - group.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); } } } diff --git a/FWCore/Framework/test/eventsetuprecord_t.cppunit.cc b/FWCore/Framework/test/eventsetuprecord_t.cppunit.cc index 06bff6f2579d5..d94f5794bee49 100644 --- a/FWCore/Framework/test/eventsetuprecord_t.cppunit.cc +++ b/FWCore/Framework/test/eventsetuprecord_t.cppunit.cc @@ -32,6 +32,7 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/ServiceRegistry/interface/ESParentContext.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include #include "oneapi/tbb/task_arena.h" @@ -188,16 +189,11 @@ namespace { void prefetch(eventsetup::EventSetupRecordImpl const& iRec) const { auto const& proxies = this->esGetTokenIndicesVector(edm::Transition::Event); for (size_t i = 0; i != proxies.size(); ++i) { - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; edm::ServiceToken token; iRec.prefetchAsync(WaitingTaskHolder(group, &waitTask), proxies[i], nullptr, token, edm::ESParentContext{}); - do { - group.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); } } @@ -211,16 +207,11 @@ namespace { void prefetch(eventsetup::EventSetupRecordImpl const& iRec) const { auto const& proxies = this->esGetTokenIndicesVector(edm::Transition::Event); for (size_t i = 0; i != proxies.size(); ++i) { - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; edm::ServiceToken token; iRec.prefetchAsync(WaitingTaskHolder(group, &waitTask), proxies[i], nullptr, token, edm::ESParentContext{}); - do { - group.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); } } diff --git a/FWCore/Framework/test/fullchain_t.cppunit.cc b/FWCore/Framework/test/fullchain_t.cppunit.cc index 91d2d421b6b82..0055c387f7c2b 100644 --- a/FWCore/Framework/test/fullchain_t.cppunit.cc +++ b/FWCore/Framework/test/fullchain_t.cppunit.cc @@ -24,6 +24,7 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Concurrency/interface/ThreadsController.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "cppunit/extensions/HelperMacros.h" @@ -56,16 +57,11 @@ namespace { for (size_t i = 0; i != proxies.size(); ++i) { auto rec = iImpl.findImpl(recs[i]); if (rec) { - edm::FinalWaitingTask waitTask; oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; rec->prefetchAsync( WaitingTaskHolder(group, &waitTask), proxies[i], &iImpl, edm::ServiceToken{}, edm::ESParentContext{}); - do { - group.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr()) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); } } } diff --git a/FWCore/Framework/test/global_filter_t.cppunit.cc b/FWCore/Framework/test/global_filter_t.cppunit.cc index 46320df3b16d5..2399cda274d93 100644 --- a/FWCore/Framework/test/global_filter_t.cppunit.cc +++ b/FWCore/Framework/test/global_filter_t.cppunit.cc @@ -24,6 +24,7 @@ #include "FWCore/ServiceRegistry/interface/ParentContext.h" #include "FWCore/ServiceRegistry/interface/StreamContext.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/GlobalIdentifier.h" #include "FWCore/Utilities/interface/Exception.h" @@ -126,16 +127,11 @@ class testGlobalFilter : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, s_streamID0, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } class BasicProd : public edm::global::EDFilter<> { diff --git a/FWCore/Framework/test/global_outputmodule_t.cppunit.cc b/FWCore/Framework/test/global_outputmodule_t.cppunit.cc index 64b88e02ec520..01d676db2b1d2 100644 --- a/FWCore/Framework/test/global_outputmodule_t.cppunit.cc +++ b/FWCore/Framework/test/global_outputmodule_t.cppunit.cc @@ -27,6 +27,7 @@ #include "FWCore/Framework/interface/FileBlock.h" #include "FWCore/Framework/interface/PreallocationConfiguration.h" #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/Exception.h" @@ -96,16 +97,11 @@ class testGlobalOutputModule : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } class BasicOutputModule : public edm::global::OutputModule<> { @@ -220,15 +216,10 @@ testGlobalOutputModule::testGlobalOutputModule() edm::ParentContext parentContext; edm::LumiTransitionInfo info(*m_lbp, *m_es); doWork(iBase, info, edm::StreamID::invalidStreamID(), parentContext); - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry); - do { - group.wait(); - } while (not task.done()); - if (task.exceptionPtr()) { - std::rethrow_exception(task.exceptionPtr()); - } + task.wait(); }; m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) { @@ -236,15 +227,10 @@ testGlobalOutputModule::testGlobalOutputModule() edm::ParentContext parentContext; edm::RunTransitionInfo info(*m_rp, *m_es); doWork(iBase, info, edm::StreamID::invalidStreamID(), parentContext); - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr); - do { - group.wait(); - } while (not task.done()); - if (task.exceptionPtr()) { - std::rethrow_exception(task.exceptionPtr()); - } + task.wait(); }; m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) { diff --git a/FWCore/Framework/test/global_producer_t.cppunit.cc b/FWCore/Framework/test/global_producer_t.cppunit.cc index 956574d86fcc0..1faa7b51a0c3c 100644 --- a/FWCore/Framework/test/global_producer_t.cppunit.cc +++ b/FWCore/Framework/test/global_producer_t.cppunit.cc @@ -24,6 +24,8 @@ #include "FWCore/ServiceRegistry/interface/ParentContext.h" #include "FWCore/ServiceRegistry/interface/StreamContext.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" +#include "FWCore/Concurrency/interface/WaitingTaskHolder.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/GlobalIdentifier.h" #include "FWCore/Utilities/interface/Exception.h" @@ -126,16 +128,11 @@ class testGlobalProducer : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, s_streamID0, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } class BasicProd : public edm::global::EDProducer<> { diff --git a/FWCore/Framework/test/limited_filter_t.cppunit.cc b/FWCore/Framework/test/limited_filter_t.cppunit.cc index 44f123737c1e7..c10ffb094b779 100644 --- a/FWCore/Framework/test/limited_filter_t.cppunit.cc +++ b/FWCore/Framework/test/limited_filter_t.cppunit.cc @@ -24,6 +24,7 @@ #include "FWCore/ServiceRegistry/interface/ParentContext.h" #include "FWCore/ServiceRegistry/interface/StreamContext.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/GlobalIdentifier.h" #include "FWCore/Utilities/interface/Exception.h" @@ -136,16 +137,11 @@ class testLimitedFilter : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, s_streamID0, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } class BasicProd : public edm::limited::EDFilter<> { diff --git a/FWCore/Framework/test/limited_outputmodule_t.cppunit.cc b/FWCore/Framework/test/limited_outputmodule_t.cppunit.cc index 65ee54e1b9e1c..e8889f16009af 100644 --- a/FWCore/Framework/test/limited_outputmodule_t.cppunit.cc +++ b/FWCore/Framework/test/limited_outputmodule_t.cppunit.cc @@ -27,6 +27,7 @@ #include "FWCore/Framework/interface/FileBlock.h" #include "FWCore/Framework/interface/PreallocationConfiguration.h" #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/Exception.h" @@ -96,16 +97,11 @@ class testLimitedOutputModule : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } class BasicOutputModule : public edm::limited::OutputModule<> { @@ -219,15 +215,10 @@ testLimitedOutputModule::testLimitedOutputModule() edm::ParentContext parentContext; edm::LumiTransitionInfo info(*m_lbp, *m_es); doWork(iBase, info, edm::StreamID::invalidStreamID(), parentContext); - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry); - do { - group.wait(); - } while (not task.done()); - if (task.exceptionPtr()) { - std::rethrow_exception(task.exceptionPtr()); - } + task.wait(); }; m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) { @@ -235,15 +226,10 @@ testLimitedOutputModule::testLimitedOutputModule() edm::ParentContext parentContext; edm::RunTransitionInfo info(*m_rp, *m_es); doWork(iBase, info, edm::StreamID::invalidStreamID(), parentContext); - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr); - do { - group.wait(); - } while (not task.done()); - if (task.exceptionPtr()) { - std::rethrow_exception(task.exceptionPtr()); - } + task.wait(); }; m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) { diff --git a/FWCore/Framework/test/limited_producer_t.cppunit.cc b/FWCore/Framework/test/limited_producer_t.cppunit.cc index 6f4c7ff2f3449..49be62e9928bc 100644 --- a/FWCore/Framework/test/limited_producer_t.cppunit.cc +++ b/FWCore/Framework/test/limited_producer_t.cppunit.cc @@ -24,6 +24,7 @@ #include "FWCore/ServiceRegistry/interface/ParentContext.h" #include "FWCore/ServiceRegistry/interface/StreamContext.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/GlobalIdentifier.h" #include "FWCore/Utilities/interface/Exception.h" @@ -136,16 +137,11 @@ class testLimitedProducer : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, s_streamID0, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } class BasicProd : public edm::limited::EDProducer<> { diff --git a/FWCore/Framework/test/one_outputmodule_t.cppunit.cc b/FWCore/Framework/test/one_outputmodule_t.cppunit.cc index af3733544c9b9..114e26e4f1dc6 100644 --- a/FWCore/Framework/test/one_outputmodule_t.cppunit.cc +++ b/FWCore/Framework/test/one_outputmodule_t.cppunit.cc @@ -29,6 +29,7 @@ #include "FWCore/Framework/interface/FileBlock.h" #include "FWCore/Framework/interface/PreallocationConfiguration.h" #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/Exception.h" @@ -108,16 +109,11 @@ class testOneOutputModule : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::StreamID id, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, id, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } class BasicOutputModule : public edm::one::OutputModule<> { @@ -314,15 +310,10 @@ testOneOutputModule::testOneOutputModule() edm::ParentContext parentContext; edm::LumiTransitionInfo info(*m_lbp, *m_es); doWork(iBase, info, edm::StreamID::invalidStreamID(), parentContext); - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; iComm->writeLumiAsync(edm::WaitingTaskHolder(group, &task), *m_lbp, nullptr, &activityRegistry); - do { - group.wait(); - } while (not task.done()); - if (task.exceptionPtr()) { - std::rethrow_exception(task.exceptionPtr()); - } + task.wait(); }; m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase, edm::OutputModuleCommunicator* iComm) { @@ -330,15 +321,10 @@ testOneOutputModule::testOneOutputModule() edm::ParentContext parentContext; edm::RunTransitionInfo info(*m_rp, *m_es); doWork(iBase, info, edm::StreamID::invalidStreamID(), parentContext); - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; iComm->writeRunAsync(edm::WaitingTaskHolder(group, &task), *m_rp, nullptr, &activityRegistry, nullptr); - do { - group.wait(); - } while (not task.done()); - if (task.exceptionPtr()) { - std::rethrow_exception(task.exceptionPtr()); - } + task.wait(); }; m_transToFunc[Trans::kGlobalCloseInputFile] = [](edm::Worker* iBase, edm::OutputModuleCommunicator*) { diff --git a/FWCore/Framework/test/stream_filter_t.cppunit.cc b/FWCore/Framework/test/stream_filter_t.cppunit.cc index 5c4a7ab262415..0dddc72579af1 100644 --- a/FWCore/Framework/test/stream_filter_t.cppunit.cc +++ b/FWCore/Framework/test/stream_filter_t.cppunit.cc @@ -24,6 +24,7 @@ #include "FWCore/Framework/interface/HistoryAppender.h" #include "FWCore/ServiceRegistry/interface/ParentContext.h" #include "FWCore/ServiceRegistry/interface/StreamContext.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/GlobalIdentifier.h" #include "FWCore/Utilities/interface/Exception.h" @@ -125,16 +126,11 @@ class testStreamFilter : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, s_streamID0, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } template diff --git a/FWCore/Framework/test/stream_producer_t.cppunit.cc b/FWCore/Framework/test/stream_producer_t.cppunit.cc index dbd05429b56e7..b5a6e90ce3299 100644 --- a/FWCore/Framework/test/stream_producer_t.cppunit.cc +++ b/FWCore/Framework/test/stream_producer_t.cppunit.cc @@ -24,6 +24,7 @@ #include "FWCore/Framework/interface/HistoryAppender.h" #include "FWCore/ServiceRegistry/interface/ParentContext.h" #include "FWCore/ServiceRegistry/interface/StreamContext.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/GlobalIdentifier.h" #include "FWCore/Utilities/interface/Exception.h" @@ -125,16 +126,11 @@ class testStreamProducer : public CppUnit::TestFixture { template void doWork(edm::Worker* iBase, Info const& info, edm::ParentContext const& iContext) { - edm::FinalWaitingTask task; oneapi::tbb::task_group group; + edm::FinalWaitingTask task{group}; edm::ServiceToken token; iBase->doWorkAsync(edm::WaitingTaskHolder(group, &task), info, token, s_streamID0, iContext, nullptr); - do { - group.wait(); - } while (not task.done()); - if (auto e = task.exceptionPtr()) { - std::rethrow_exception(e); - } + task.wait(); } template diff --git a/FWCore/TestProcessor/src/TestProcessor.cc b/FWCore/TestProcessor/src/TestProcessor.cc index a32c2b7754129..c3730fe9cf5aa 100644 --- a/FWCore/TestProcessor/src/TestProcessor.cc +++ b/FWCore/TestProcessor/src/TestProcessor.cc @@ -47,6 +47,7 @@ #include "FWCore/Utilities/interface/ExceptionCollector.h" #include "FWCore/Concurrency/interface/ThreadsController.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "DataFormats/Provenance/interface/ParentageRegistry.h" @@ -387,11 +388,9 @@ namespace edm { if (beginJobCalled_) { endJob(); } - edm::FinalWaitingTask task; + edm::FinalWaitingTask task{taskGroup_}; espController_->endIOVsAsync(edm::WaitingTaskHolder(taskGroup_, &task)); - do { - taskGroup_.wait(); - } while (not task.done()); + task.waitNoThrow(); }); } @@ -433,15 +432,10 @@ namespace edm { { ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal); using Traits = OccurrenceTraits; - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; beginGlobalTransitionAsync( WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr()) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } beginProcessBlockCalled_ = true; } @@ -464,19 +458,14 @@ namespace edm { std::vector emptyList; { using Traits = OccurrenceTraits; - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; beginGlobalTransitionAsync( WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr()) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } { //To wait, the ref count has to be 1+#streams - FinalWaitingTask streamLoopWaitTask; + FinalWaitingTask streamLoopWaitTask{taskGroup_}; using Traits = OccurrenceTraits; beginStreamsTransitionAsync(WaitingTaskHolder(taskGroup_, &streamLoopWaitTask), @@ -485,13 +474,7 @@ namespace edm { transitionInfo, serviceToken_, emptyList); - - do { - taskGroup_.wait(); - } while (not streamLoopWaitTask.done()); - if (streamLoopWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(streamLoopWaitTask.exceptionPtr()); - } + streamLoopWaitTask.wait(); } beginRunCalled_ = true; } @@ -515,19 +498,14 @@ namespace edm { std::vector emptyList; { using Traits = OccurrenceTraits; - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; beginGlobalTransitionAsync( WaitingTaskHolder(taskGroup_, &globalWaitTask), *schedule_, transitionInfo, serviceToken_, emptyList); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } { //To wait, the ref count has to be 1+#streams - FinalWaitingTask streamLoopWaitTask; + FinalWaitingTask streamLoopWaitTask{taskGroup_}; using Traits = OccurrenceTraits; @@ -538,12 +516,7 @@ namespace edm { serviceToken_, emptyList); - do { - taskGroup_.wait(); - } while (not streamLoopWaitTask.done()); - if (streamLoopWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(streamLoopWaitTask.exceptionPtr()); - } + streamLoopWaitTask.wait(); } beginLumiCalled_ = true; } @@ -573,17 +546,12 @@ namespace edm { ServiceRegistry::Operate operate(serviceToken_); - FinalWaitingTask waitTask; + FinalWaitingTask waitTask{taskGroup_}; EventTransitionInfo info(*pep, esp_->eventSetupImpl()); schedule_->processOneEventAsync(edm::WaitingTaskHolder(taskGroup_, &waitTask), 0, info, serviceToken_); - do { - taskGroup_.wait(); - } while (not waitTask.done()); - if (waitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(waitTask.exceptionPtr()); - } + waitTask.wait(); ++eventNumber_; } @@ -604,7 +572,7 @@ namespace edm { //To wait, the ref count has to be 1+#streams { - FinalWaitingTask streamLoopWaitTask; + FinalWaitingTask streamLoopWaitTask{taskGroup_}; using Traits = OccurrenceTraits; @@ -616,15 +584,10 @@ namespace edm { emptyList, false); - do { - taskGroup_.wait(); - } while (not streamLoopWaitTask.done()); - if (streamLoopWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(streamLoopWaitTask.exceptionPtr()); - } + streamLoopWaitTask.wait(); } { - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; using Traits = OccurrenceTraits; endGlobalTransitionAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask), @@ -633,12 +596,7 @@ namespace edm { serviceToken_, emptyList, false); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } } return lumiPrincipal; @@ -665,7 +623,7 @@ namespace edm { //To wait, the ref count has to be 1+#streams { - FinalWaitingTask streamLoopWaitTask; + FinalWaitingTask streamLoopWaitTask{taskGroup_}; using Traits = OccurrenceTraits; @@ -677,15 +635,10 @@ namespace edm { emptyList, false); - do { - taskGroup_.wait(); - } while (not streamLoopWaitTask.done()); - if (streamLoopWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(streamLoopWaitTask.exceptionPtr()); - } + streamLoopWaitTask.wait(); } { - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; using Traits = OccurrenceTraits; endGlobalTransitionAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask), @@ -694,12 +647,7 @@ namespace edm { serviceToken_, emptyList, false); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } principalCache_.deleteRun(phid, runNumber_); @@ -714,7 +662,7 @@ namespace edm { std::vector emptyList; { - FinalWaitingTask globalWaitTask; + FinalWaitingTask globalWaitTask{taskGroup_}; ProcessBlockTransitionInfo transitionInfo(processBlockPrincipal); using Traits = OccurrenceTraits; @@ -724,12 +672,7 @@ namespace edm { serviceToken_, emptyList, false); - do { - taskGroup_.wait(); - } while (not globalWaitTask.done()); - if (globalWaitTask.exceptionPtr() != nullptr) { - std::rethrow_exception(globalWaitTask.exceptionPtr()); - } + globalWaitTask.wait(); } } return &processBlockPrincipal;