From 91996041cbcc1abbef61737c32628142498cb979 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Wed, 21 Jul 2021 13:24:43 -0500 Subject: [PATCH 1/2] Extend global::OutputModule to support ExternalWork --- .../interface/global/OutputModuleBase.h | 7 +++++- .../global/outputmoduleAbilityToImplementor.h | 23 +++++++++++++++++++ FWCore/Framework/src/Worker.h | 22 ++++++++++++++---- FWCore/Framework/src/WorkerT.cc | 7 ++++++ .../Framework/src/global/OutputModuleBase.cc | 11 +++++++++ 5 files changed, 65 insertions(+), 5 deletions(-) diff --git a/FWCore/Framework/interface/global/OutputModuleBase.h b/FWCore/Framework/interface/global/OutputModuleBase.h index fd3896630d17f..bee9eda7130d9 100644 --- a/FWCore/Framework/interface/global/OutputModuleBase.h +++ b/FWCore/Framework/interface/global/OutputModuleBase.h @@ -132,6 +132,10 @@ namespace edm { void doEndStream(StreamID id) { doEndStream_(id); } bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*); + void doAcquire(EventTransitionInfo const&, + ActivityRegistry*, + ModuleCallingContext const*, + WaitingTaskWithArenaHolder&); //For now this is a placeholder /*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const& iModuleCallingContext, @@ -264,10 +268,11 @@ namespace edm { virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {} virtual void doRespondToOpenInputFile_(FileBlock const&) {} virtual void doRespondToCloseInputFile_(FileBlock const&) {} + virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder&) {} virtual void setProcessesWithSelectedMergeableRunProducts(std::set const&) {} - bool hasAcquire() const { return false; } + virtual bool hasAcquire() const { return false; } bool hasAccumulator() const { return false; } void keepThisBranch(BranchDescription const& desc, diff --git a/FWCore/Framework/interface/global/outputmoduleAbilityToImplementor.h b/FWCore/Framework/interface/global/outputmoduleAbilityToImplementor.h index 78d37717c66a0..9d57a1eb276fb 100644 --- a/FWCore/Framework/interface/global/outputmoduleAbilityToImplementor.h +++ b/FWCore/Framework/interface/global/outputmoduleAbilityToImplementor.h @@ -130,6 +130,24 @@ namespace edm { std::unique_ptr[]> caches_; }; + template + class ExternalWork : public virtual T { + public: + ExternalWork(edm::ParameterSet const& iPSet) : OutputModuleBase(iPSet) {} + ExternalWork(ExternalWork const&) = delete; + ExternalWork& operator=(ExternalWork const&) = delete; + ~ExternalWork() noexcept(false) override{}; + + private: + bool hasAcquire() const override { return true; } + + void doAcquire_(StreamID id, EventForOutput const& event, WaitingTaskWithArenaHolder& holder) final { + acquire(id, event, holder); + } + + virtual void acquire(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder) const = 0; + }; + template struct AbilityToImplementor; @@ -153,6 +171,11 @@ namespace edm { typedef edm::global::outputmodule::StreamCacheHolder Type; }; + template <> + struct AbilityToImplementor { + typedef edm::global::outputmodule::ExternalWork Type; + }; + } // namespace outputmodule } // namespace global } // namespace edm diff --git a/FWCore/Framework/src/Worker.h b/FWCore/Framework/src/Worker.h index abd41a7922f1a..048a134dbad3a 100644 --- a/FWCore/Framework/src/Worker.h +++ b/FWCore/Framework/src/Worker.h @@ -954,7 +954,7 @@ namespace edm { if (workerhelper::CallImpl::needToRunSelection(this)) { //We need to run the selection in a different task so that // we can prefetch the data needed for the selection - auto runTask = + WaitingTask* moduleTask = new RunModuleTask(this, transitionInfo, token, streamID, parentContext, context, task.group()); //make sure the task is either run or destroyed @@ -973,14 +973,28 @@ namespace edm { private: std::atomic m_task; }; + if constexpr (T::isEvent_) { + if (hasAcquire()) { + auto ownRunTask = std::make_shared(moduleTask); + ServiceWeakToken weakToken = token; + auto* group = task.group(); + moduleTask = make_waiting_task( + [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) { + WaitingTaskWithArenaHolder runTaskHolder( + *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext)); + AcquireTask t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder); + t.execute(); + }); + } + } auto* group = task.group(); - auto ownRunTask = std::make_shared(runTask); + auto ownModuleTask = std::make_shared(moduleTask); ServiceWeakToken weakToken = token; auto selectionTask = - make_waiting_task([ownRunTask, parentContext, info = transitionInfo, weakToken, group, this]( + make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this]( std::exception_ptr const*) mutable { ServiceRegistry::Operate guard(weakToken.lock()); - prefetchAsync(WaitingTaskHolder(*group, ownRunTask->release()), + prefetchAsync(WaitingTaskHolder(*group, ownModuleTask->release()), weakToken.lock(), parentContext, info, diff --git a/FWCore/Framework/src/WorkerT.cc b/FWCore/Framework/src/WorkerT.cc index aed080e0bc5b0..1b06d94ea9860 100644 --- a/FWCore/Framework/src/WorkerT.cc +++ b/FWCore/Framework/src/WorkerT.cc @@ -247,6 +247,13 @@ namespace edm { module_->doAcquire(info, activityRegistry(), mcc, holder); } + template <> + inline void WorkerT::implDoAcquire(EventTransitionInfo const& info, + ModuleCallingContext const* mcc, + WaitingTaskWithArenaHolder& holder) { + module_->doAcquire(info, activityRegistry(), mcc, holder); + } + template <> inline void WorkerT::implDoAcquire(EventTransitionInfo const& info, ModuleCallingContext const* mcc, diff --git a/FWCore/Framework/src/global/OutputModuleBase.cc b/FWCore/Framework/src/global/OutputModuleBase.cc index 5ab7c2a2aec0e..19dd868d6de41 100644 --- a/FWCore/Framework/src/global/OutputModuleBase.cc +++ b/FWCore/Framework/src/global/OutputModuleBase.cc @@ -30,6 +30,7 @@ #include "FWCore/Framework/interface/TriggerNamesService.h" #include "FWCore/Framework/src/EventSignalsSentry.h" #include "FWCore/Framework/src/PreallocationConfiguration.h" +#include "FWCore/Framework/src/EventAcquireSignalsSentry.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" @@ -276,6 +277,16 @@ namespace edm { return true; } + void OutputModuleBase::doAcquire(EventTransitionInfo const& info, + ActivityRegistry* act, + ModuleCallingContext const* mcc, + WaitingTaskWithArenaHolder& holder) { + EventForOutput e(info, moduleDescription_, mcc); + e.setConsumer(this); + EventAcquireSignalsSentry sentry(act, mcc); + this->doAcquire_(e.streamID(), e, holder); + } + bool OutputModuleBase::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { RunForOutput r(info, moduleDescription_, mcc, false); r.setConsumer(this); From 796eba85673b4c0d8386a44bf3e221832f4ebe1a Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Wed, 21 Jul 2021 14:46:32 -0500 Subject: [PATCH 2/2] Add unit test for global::OutputModule --- FWCore/Framework/test/BuildFile.xml | 1 + .../Framework/test/stubs/TestFilterModule.cc | 63 +++++++++++++++++++ .../testExternalWorkGlobalOutputModule_cfg.py | 50 +++++++++++++++ 3 files changed, 114 insertions(+) create mode 100644 FWCore/Framework/test/testExternalWorkGlobalOutputModule_cfg.py diff --git a/FWCore/Framework/test/BuildFile.xml b/FWCore/Framework/test/BuildFile.xml index f6ed0840fc9bc..970412381d6a9 100644 --- a/FWCore/Framework/test/BuildFile.xml +++ b/FWCore/Framework/test/BuildFile.xml @@ -395,3 +395,4 @@ + diff --git a/FWCore/Framework/test/stubs/TestFilterModule.cc b/FWCore/Framework/test/stubs/TestFilterModule.cc index 852302c39cf56..72b10cc8a347d 100644 --- a/FWCore/Framework/test/stubs/TestFilterModule.cc +++ b/FWCore/Framework/test/stubs/TestFilterModule.cc @@ -2,6 +2,7 @@ #include "FWCore/Framework/interface/one/EDAnalyzer.h" #include "FWCore/Framework/interface/one/EDFilter.h" #include "FWCore/Framework/interface/one/OutputModule.h" +#include "FWCore/Framework/interface/global/OutputModule.h" #include "FWCore/Framework/interface/global/EDAnalyzer.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" @@ -88,6 +89,25 @@ namespace edmtest { int total_; }; + class ExternalWorkSewerModule : public edm::global::OutputModule { + public: + explicit ExternalWorkSewerModule(edm::ParameterSet const&); + + static void fillDescriptions(ConfigurationDescriptions& descriptions); + + private: + void write(edm::EventForOutput const& e) override; + void acquire(edm::StreamID, edm::EventForOutput const& e, edm::WaitingTaskWithArenaHolder) const override; + void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override {} + void writeRun(edm::RunForOutput const&) override {} + void endJob() override; + + const std::string name_; + const int num_pass_; + mutable std::atomic total_; + mutable std::atomic totalAcquire_; + }; + // ----------------------------------------------------------------- TestResultAnalyzer::TestResultAnalyzer(edm::ParameterSet const& ps) @@ -202,8 +222,50 @@ namespace edmtest { descriptions.add("sewerModule", desc); } + // --------- + + ExternalWorkSewerModule::ExternalWorkSewerModule(edm::ParameterSet const& ps) + : edm::global::OutputModuleBase::OutputModuleBase(ps), + edm::global::OutputModule(ps), + name_(ps.getParameter("name")), + num_pass_(ps.getParameter("shouldPass")), + total_(0), + totalAcquire_(0) {} + + void ExternalWorkSewerModule::acquire(edm::StreamID, + edm::EventForOutput const&, + WaitingTaskWithArenaHolder task) const { + ++totalAcquire_; + } + void ExternalWorkSewerModule::write(edm::EventForOutput const&) { ++total_; } + + void ExternalWorkSewerModule::endJob() { + std::cerr << "EXTERNALWORKSEWERMODULE " << name_ << ": should pass " << num_pass_ << ", did pass " << total_.load() + << " with acquire " << totalAcquire_.load() << "\n"; + + if (total_.load() != num_pass_) { + std::cerr << "number passed should be " << num_pass_ << ", but got " << total_.load() << "\n"; + abort(); + } + + if (total_.load() != totalAcquire_.load()) { + std::cerr << "write() called " << total_.load() << ", but acquire called " << totalAcquire_.load() << "\n"; + abort(); + } + } + + void ExternalWorkSewerModule::fillDescriptions(ConfigurationDescriptions& descriptions) { + ParameterSetDescription desc; + desc.setComment("Tracks number of times the write and acquire methods are called."); + desc.add("name")->setComment("name used in printout"); + desc.add("shouldPass")->setComment("number of times write/acquire should be called"); + edm::one::OutputModule<>::fillDescription(desc, std::vector(1U, std::string("drop *"))); + descriptions.add("externalWorkSewerModule", desc); + } + } // namespace edmtest +using edmtest::ExternalWorkSewerModule; using edmtest::SewerModule; using edmtest::TestContextAnalyzer; using edmtest::TestFilterModule; @@ -212,4 +274,5 @@ using edmtest::TestResultAnalyzer; DEFINE_FWK_MODULE(TestFilterModule); DEFINE_FWK_MODULE(TestResultAnalyzer); DEFINE_FWK_MODULE(SewerModule); +DEFINE_FWK_MODULE(ExternalWorkSewerModule); DEFINE_FWK_MODULE(TestContextAnalyzer); diff --git a/FWCore/Framework/test/testExternalWorkGlobalOutputModule_cfg.py b/FWCore/Framework/test/testExternalWorkGlobalOutputModule_cfg.py new file mode 100644 index 0000000000000..66c5be12b8179 --- /dev/null +++ b/FWCore/Framework/test/testExternalWorkGlobalOutputModule_cfg.py @@ -0,0 +1,50 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("PROD") + +import FWCore.Framework.test.cmsExceptionsFatalOption_cff +process.options = cms.untracked.PSet( + wantSummary = cms.untracked.bool(True), + Rethrow = FWCore.Framework.test.cmsExceptionsFatalOption_cff.Rethrow +) + +process.maxEvents = cms.untracked.PSet( + input = cms.untracked.int32(99) +) + +process.source = cms.Source("EmptySource") + +process.f1 = cms.EDFilter("TestFilterModule", + acceptValue = cms.untracked.int32(30), + onlyOne = cms.untracked.bool(True) +) + +process.outp = cms.OutputModule("ExternalWorkSewerModule", + shouldPass = cms.int32(3), + name = cms.string('for_p'), + SelectEvents = cms.untracked.PSet( + SelectEvents = cms.vstring('p') + ) +) + +process.outNone = cms.OutputModule("ExternalWorkSewerModule", + shouldPass = cms.int32(99), + name = cms.string('for_none') +) + +process.outpempty = cms.OutputModule("ExternalWorkSewerModule", + shouldPass = cms.int32(99), + name = cms.string('pEmpty'), + SelectEvents = cms.untracked.PSet( + SelectEvents = cms.vstring('pEmpty') + ) +) + +process.pEmpty = cms.Path() +process.p = cms.Path(process.f1) + +process.e1 = cms.EndPath(process.outp) +process.e2 = cms.EndPath(process.outNone) +process.e3 = cms.EndPath(process.outpempty) + +