From 9b93b8af27bab9918315f2131d414240070d5b4f Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 26 Aug 2013 15:40:22 -0500 Subject: [PATCH 01/10] Allow EventPrincipal to create its own StreamID --- FWCore/Utilities/interface/StreamID.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/FWCore/Utilities/interface/StreamID.h b/FWCore/Utilities/interface/StreamID.h index 68f8970a95b3c..34da9e0b818c2 100644 --- a/FWCore/Utilities/interface/StreamID.h +++ b/FWCore/Utilities/interface/StreamID.h @@ -25,7 +25,7 @@ // forward declarations namespace edm { class Schedule; - class EventProcessor; + class EventPrincipal; class StreamID { @@ -52,7 +52,7 @@ namespace edm { private: ///Only a Schedule is allowed to create one of these friend class Schedule; - friend class EventProcessor; + friend class EventPrincipal; explicit StreamID(unsigned int iValue) : value_(iValue) {} StreamID() = delete; From 6a8017f66e9c73731df5ae576d6b813674b8904d Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 26 Aug 2013 15:41:22 -0500 Subject: [PATCH 02/10] Can now create its own StreamID so just pass the index to the constructor --- FWCore/Framework/interface/EventPrincipal.h | 2 +- FWCore/Framework/src/EventPrincipal.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/FWCore/Framework/interface/EventPrincipal.h b/FWCore/Framework/interface/EventPrincipal.h index eaa694352fa90..9cee1c4d12716 100644 --- a/FWCore/Framework/interface/EventPrincipal.h +++ b/FWCore/Framework/interface/EventPrincipal.h @@ -52,7 +52,7 @@ namespace edm { boost::shared_ptr branchIDListHelper, ProcessConfiguration const& pc, HistoryAppender* historyAppender, - StreamID const& streamID = StreamID::invalidStreamID()); + unsigned int streamIndex = 0); ~EventPrincipal() {} void fillEventPrincipal(EventAuxiliary const& aux, diff --git a/FWCore/Framework/src/EventPrincipal.cc b/FWCore/Framework/src/EventPrincipal.cc index 70246f71621bc..38a269e8544d9 100644 --- a/FWCore/Framework/src/EventPrincipal.cc +++ b/FWCore/Framework/src/EventPrincipal.cc @@ -23,7 +23,7 @@ namespace edm { boost::shared_ptr branchIDListHelper, ProcessConfiguration const& pc, HistoryAppender* historyAppender, - StreamID const& streamID) : + unsigned int streamIndex) : Base(reg, reg->productLookup(InEvent), pc, InEvent, historyAppender), aux_(), luminosityBlockPrincipal_(), @@ -34,7 +34,7 @@ namespace edm { branchIDListHelper_(branchIDListHelper), branchListIndexes_(new BranchListIndexes), branchListIndexToProcessIndex_(), - streamID_(streamID){} + streamID_(streamIndex){} void EventPrincipal::clearEventPrincipal() { From ee4a38e63228b9dbc44d53c5837663d2d2a783d7 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 26 Aug 2013 15:52:20 -0500 Subject: [PATCH 03/10] PrincipalCache now supports multiple concurrent EventPrincipals --- FWCore/Framework/src/EventProcessor.cc | 28 ++++++++++++++------------ FWCore/Framework/src/PrincipalCache.cc | 19 ++++++++++++----- FWCore/Framework/src/PrincipalCache.h | 12 +++++++---- FWCore/Framework/src/SubProcess.cc | 12 +++++++---- 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 4fcd2fd6b435b..40e80815575cb 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -677,14 +677,16 @@ namespace edm { FDEBUG(2) << parameterSet << std::endl; - // Reusable event principal - boost::shared_ptr ep(new EventPrincipal(preg_, - branchIDListHelper_, - *processConfiguration_, - historyAppender_.get(), - StreamID{0})); - principalCache_.insert(ep); - + principalCache_.setNumberOfConcurrentPrincipals(preallocations_); + for(unsigned int index = 0; index ep(new EventPrincipal(preg_, + branchIDListHelper_, + *processConfiguration_, + historyAppender_.get(), + index)); + principalCache_.insert(ep,index); + } // initialize the subprocess, if there is one if(subProcessParameterSet) { subProcess_.reset(new SubProcess(*subProcessParameterSet, *parameterSet, preg_, branchIDListHelper_, *espController_, *actReg_, token, serviceregistry::kConfigurationOverrides, preallocations_, &processContext_)); @@ -1701,7 +1703,7 @@ namespace edm { if(size < preg_->size()) { principalCache_.adjustIndexesAfterProductRegistryAddition(); } - principalCache_.adjustEventToNewProductRegistry(preg_); + principalCache_.adjustEventsToNewProductRegistry(preg_); } } itemType = (more ? input_->nextItemType() : InputSource::IsStop); @@ -1884,7 +1886,7 @@ namespace edm { if(size < preg_->size()) { principalCache_.adjustIndexesAfterProductRegistryAddition(); } - principalCache_.adjustEventToNewProductRegistry(preg_); + principalCache_.adjustEventsToNewProductRegistry(preg_); if(numberOfForkedChildren_ > 0) { fb_->setNotFastClonable(FileBlock::ParallelProcesses); } @@ -2198,8 +2200,8 @@ namespace edm { void EventProcessor::readAndProcessEvent() { //TODO this will have to become per stream - StreamContext streamContext(StreamID{0}, &processContext_); - EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal(), &streamContext); + StreamContext streamContext(principalCache_.eventPrincipal(0).streamID(), &processContext_); + EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal(0), &streamContext); FDEBUG(1) << "\treadEvent\n"; assert(pep != 0); pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr()); @@ -2227,7 +2229,7 @@ namespace edm { EDLooperBase::Status status = EDLooperBase::kContinue; do { - StreamContext streamContext(StreamID{0}, &processContext_); + StreamContext streamContext(pep->streamID(), &processContext_); status = looper_->doDuringLoop(*pep, esp_->eventSetup(), pc, &streamContext); bool succeeded = true; diff --git a/FWCore/Framework/src/PrincipalCache.cc b/FWCore/Framework/src/PrincipalCache.cc index 61ca77b444055..af70b19e4d903 100644 --- a/FWCore/Framework/src/PrincipalCache.cc +++ b/FWCore/Framework/src/PrincipalCache.cc @@ -3,6 +3,7 @@ #include "FWCore/Framework/interface/EventPrincipal.h" #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" #include "FWCore/Framework/interface/RunPrincipal.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/Utilities/interface/EDMException.h" #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" @@ -15,6 +16,12 @@ namespace edm { PrincipalCache::~PrincipalCache() { } + + void PrincipalCache::setNumberOfConcurrentPrincipals(PreallocationConfiguration const& iConfig) + { + eventPrincipals_.resize(iConfig.numberOfStreams()); + } + RunPrincipal& PrincipalCache::runPrincipal(ProcessHistoryID const& phid, RunNumber_t run) const { if (phid != reducedInputProcessHistoryID_ || @@ -239,11 +246,13 @@ namespace edm { lumiPrincipal_.reset(); } - void PrincipalCache::adjustEventToNewProductRegistry(boost::shared_ptr reg) { - if (eventPrincipal_) { - eventPrincipal_->adjustIndexesAfterProductRegistryAddition(); - bool eventOK = eventPrincipal_->adjustToNewProductRegistry(*reg); - assert(eventOK); + void PrincipalCache::adjustEventsToNewProductRegistry(boost::shared_ptr reg) { + for(auto &eventPrincipal : eventPrincipals_) { + if (eventPrincipal) { + eventPrincipal->adjustIndexesAfterProductRegistryAddition(); + bool eventOK = eventPrincipal->adjustToNewProductRegistry(*reg); + assert(eventOK); + } } } diff --git a/FWCore/Framework/src/PrincipalCache.h b/FWCore/Framework/src/PrincipalCache.h index a76a7d63d3bba..4c29c58b154a0 100644 --- a/FWCore/Framework/src/PrincipalCache.h +++ b/FWCore/Framework/src/PrincipalCache.h @@ -28,6 +28,8 @@ Original Author: W. David Dagenhart #include "DataFormats/Provenance/interface/LuminosityBlockID.h" #include "boost/shared_ptr.hpp" +#include +#include namespace edm { @@ -37,6 +39,7 @@ namespace edm { class RunAuxiliary; class LuminosityBlockAuxiliary; class ProductRegistry; + class PreallocationConfiguration; class PrincipalCache { public: @@ -56,19 +59,20 @@ namespace edm { boost::shared_ptr const& lumiPrincipalPtr() const; bool hasLumiPrincipal() const {return lumiPrincipal_;} - EventPrincipal& eventPrincipal() const { return *eventPrincipal_; } + EventPrincipal& eventPrincipal(unsigned int iStreamIndex) const { return *(eventPrincipals_[iStreamIndex]); } void merge(boost::shared_ptr aux, boost::shared_ptr reg); void merge(boost::shared_ptr aux, boost::shared_ptr reg); + void setNumberOfConcurrentPrincipals(PreallocationConfiguration const&); void insert(boost::shared_ptr rp); void insert(boost::shared_ptr lbp); - void insert(boost::shared_ptr ep) { eventPrincipal_ = ep; } + void insert(boost::shared_ptr ep, unsigned int iStreamIndex) { assert(iStreamIndex < eventPrincipals_.size()); eventPrincipals_[iStreamIndex] = ep; } void deleteRun(ProcessHistoryID const& phid, RunNumber_t run); void deleteLumi(ProcessHistoryID const& phid, RunNumber_t run, LuminosityBlockNumber_t lumi); - void adjustEventToNewProductRegistry(boost::shared_ptr reg); + void adjustEventsToNewProductRegistry(boost::shared_ptr reg); void adjustIndexesAfterProductRegistryAddition(); @@ -81,7 +85,7 @@ namespace edm { // lumi, or event boost::shared_ptr runPrincipal_; boost::shared_ptr lumiPrincipal_; - boost::shared_ptr eventPrincipal_; + std::vector> eventPrincipals_; // These are intentionally not cleared so that when inserting // the next principal the conversion from full ProcessHistoryID_ diff --git a/FWCore/Framework/src/SubProcess.cc b/FWCore/Framework/src/SubProcess.cc index c309401b6b903..2b0406d5ebfe3 100644 --- a/FWCore/Framework/src/SubProcess.cc +++ b/FWCore/Framework/src/SubProcess.cc @@ -22,6 +22,7 @@ #include "FWCore/Framework/interface/TriggerNamesService.h" #include "FWCore/Framework/src/EventSetupsController.h" #include "FWCore/Framework/src/SignallingProductRegistry.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/ParameterSet/interface/IllegalParameters.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/Utilities/interface/ExceptionCollector.h" @@ -139,9 +140,12 @@ namespace edm { processContext_.setProcessConfiguration(processConfiguration_.get()); processContext_.setParentProcessContext(parentProcessContext); - boost::shared_ptr ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get(), - StreamID::invalidStreamID())); - principalCache_.insert(ep); + principalCache_.setNumberOfConcurrentPrincipals(preallocConfig); + for(unsigned int index = 0; index < preallocConfig.numberOfStreams(); ++index) { + boost::shared_ptr ep(new EventPrincipal(preg_, branchIDListHelper_, *processConfiguration_, historyAppender_.get(), + index)); + principalCache_.insert(ep,index); + } if(subProcessParameterSet) { subProcess_.reset(new SubProcess(*subProcessParameterSet, topLevelParameterSet, preg_, branchIDListHelper_, esController, *items.actReg_, newToken, iLegacy, preallocConfig, &processContext_)); @@ -281,7 +285,7 @@ namespace edm { esids->push_back(selector_config_id_); } - EventPrincipal& ep = principalCache_.eventPrincipal(); + EventPrincipal& ep = principalCache_.eventPrincipal(principal.streamID().value()); ep.setStreamID(principal.streamID()); ep.fillEventPrincipal(aux, esids, From 1e6e952092c4fa6733312eee8aada1dd315f920e Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Tue, 27 Aug 2013 09:54:22 -0500 Subject: [PATCH 04/10] Do not update EventSetup on event transitions --- FWCore/Framework/interface/SubProcess.h | 4 ++-- FWCore/Framework/src/EventProcessor.cc | 17 ++++++++++++----- FWCore/Framework/src/SubProcess.cc | 10 +++++----- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/FWCore/Framework/interface/SubProcess.h b/FWCore/Framework/interface/SubProcess.h index aeeba819e8bb6..443a14884a02f 100644 --- a/FWCore/Framework/interface/SubProcess.h +++ b/FWCore/Framework/interface/SubProcess.h @@ -54,7 +54,7 @@ namespace edm { void doBeginJob(); void doEndJob(); - void doEvent(EventPrincipal const& principal, IOVSyncValue const& ts); + void doEvent(EventPrincipal const& principal); void doBeginRun(RunPrincipal const& principal, IOVSyncValue const& ts); @@ -200,7 +200,7 @@ namespace edm { private: void beginJob(); void endJob(); - void process(EventPrincipal const& e, IOVSyncValue const& ts); + void process(EventPrincipal const& e); void beginRun(RunPrincipal const& r, IOVSyncValue const& ts); void endRun(RunPrincipal const& r, IOVSyncValue const& ts, bool cleaningUpAfterException); void beginLuminosityBlock(LuminosityBlockPrincipal const& lb, IOVSyncValue const& ts); diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 40e80815575cb..ac48d2793550e 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -619,8 +619,6 @@ namespace edm { //bad } */ - preallocations_ = PreallocationConfiguration(nThreads,nStreams,nConcurrentLumis,nConcurrentRuns); - //forking ParameterSet const& forking = optionsPset.getUntrackedParameterSet("multiProcesses", ParameterSet()); numberOfForkedChildren_ = forking.getUntrackedParameter("maxChildProcesses", 0); @@ -659,7 +657,14 @@ namespace edm { if(looper_) { looper_->setActionTable(items.act_table_.get()); looper_->attachTo(*items.actReg_); + + //For now loopers make us run only 1 transition at a time + nStreams=1; + nConcurrentLumis=1; + nConcurrentRuns=1; } + + preallocations_ = PreallocationConfiguration(nThreads,nStreams,nConcurrentLumis,nConcurrentRuns); // initialize the input source input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_); @@ -2209,17 +2214,19 @@ namespace edm { assert(principalCache_.lumiPrincipalPtr()->run() == pep->run()); assert(principalCache_.lumiPrincipalPtr()->luminosityBlock() == pep->luminosityBlock()); - IOVSyncValue ts(pep->id(), pep->time()); - espController_->eventSetupForInstance(ts); + //We can only update IOVs on Lumi boundaries + //IOVSyncValue ts(pep->id(), pep->time()); + //espController_->eventSetupForInstance(ts); EventSetup const& es = esp_->eventSetup(); { typedef OccurrenceTraits Traits; schedule_->processOneEvent(0,*pep, es); if(hasSubProcess()) { - subProcess_->doEvent(*pep, ts); + subProcess_->doEvent(*pep); } } + //NOTE: If we have a looper we only have one Stream if(looper_) { bool randomAccess = input_->randomAccess(); ProcessingController::ForwardState forwardState = input_->forwardState(); diff --git a/FWCore/Framework/src/SubProcess.cc b/FWCore/Framework/src/SubProcess.cc index 2b0406d5ebfe3..ea8fb5601c063 100644 --- a/FWCore/Framework/src/SubProcess.cc +++ b/FWCore/Framework/src/SubProcess.cc @@ -257,7 +257,7 @@ namespace edm { } void - SubProcess::doEvent(EventPrincipal const& ep, IOVSyncValue const& ts) { + SubProcess::doEvent(EventPrincipal const& ep) { ServiceRegistry::Operate operate(serviceToken_); /* BEGIN relevant bits from OutputModule::doEvent */ detail::TRBESSentry products_sentry(selectors_); @@ -270,12 +270,12 @@ namespace edm { return; } } - process(ep,ts); + process(ep); /* END relevant bits from OutputModule::doEvent */ } void - SubProcess::process(EventPrincipal const& principal, IOVSyncValue const& ts) { + SubProcess::process(EventPrincipal const& principal) { EventAuxiliary aux(principal.aux()); aux.setProcessHistoryID(principal.processHistoryID()); @@ -295,8 +295,8 @@ namespace edm { ep.setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr()); propagateProducts(InEvent, principal, ep); typedef OccurrenceTraits Traits; - schedule_->processOneEvent(ep.streamID().value(),ep, esp_->eventSetupForInstance(ts)); - if(subProcess_.get()) subProcess_->doEvent(ep, ts); + schedule_->processOneEvent(ep.streamID().value(),ep, esp_->eventSetup()); + if(subProcess_.get()) subProcess_->doEvent(ep); ep.clearEventPrincipal(); } From 724ddf5cc9872ab14fd839e3b1462d756c699f1c Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 28 Aug 2013 17:56:41 -0500 Subject: [PATCH 05/10] Processes a contiguous group of events in an inner loop To support concurrent event processing, we need to isolate the event handling from the rest of the transition handling. This is the first step. --- FWCore/Framework/interface/EventProcessor.h | 9 ++ FWCore/Framework/src/EventProcessor.cc | 101 ++++++++++++++------ 2 files changed, 80 insertions(+), 30 deletions(-) diff --git a/FWCore/Framework/interface/EventProcessor.h b/FWCore/Framework/interface/EventProcessor.h index ae18e4b313054..729f583b10b6e 100644 --- a/FWCore/Framework/interface/EventProcessor.h +++ b/FWCore/Framework/interface/EventProcessor.h @@ -303,6 +303,15 @@ namespace edm { } void possiblyContinueAfterForkChildFailure(); + + //read the next event using Stream iStreamIndex + void readEvent(unsigned int iStreamIndex); + + //process the already read event using Stream iStreamIndex + void processEvent(unsigned int iStreamIndex); + + //returns true if an asynchronous stop was requested + bool checkForAsyncStopRequest(StatusCode&); //------------------------------------------------------------------ // // Data members below. diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index ac48d2793550e..60b314174c8d4 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -1673,6 +1673,37 @@ namespace edm { return machine; } + bool + EventProcessor::checkForAsyncStopRequest(StatusCode& returnCode) { + bool returnValue = false; + // These are used for asynchronous running only and + // and are checking to see if stopAsync or shutdownAsync + // were called from another thread. In the future, we + // may need to do something better than polling the state. + // With the current code this is the simplest thing and + // it should always work. If the interaction between + // threads becomes more complex this may cause problems. + if(state_ == sStopping) { + FDEBUG(1) << "In main processing loop, encountered sStopping state\n"; + returnValue = true; + } + else if(state_ == sShuttingDown) { + FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n"; + returnValue = true; + } + + // Look for a shutdown signal + { + boost::mutex::scoped_lock sl(usr2_lock); + if(shutdown_flag) { + changeState(mShutdownSignal); + returnValue = true; + returnCode = epSignal; + } + } + return returnValue; + } + EventProcessor::StatusCode EventProcessor::runToCompletion(bool onlineStateTransitions) { @@ -1715,34 +1746,28 @@ namespace edm { FDEBUG(1) << "itemType = " << itemType << "\n"; - // These are used for asynchronous running only and - // and are checking to see if stopAsync or shutdownAsync - // were called from another thread. In the future, we - // may need to do something better than polling the state. - // With the current code this is the simplest thing and - // it should always work. If the interaction between - // threads becomes more complex this may cause problems. - if(state_ == sStopping) { - FDEBUG(1) << "In main processing loop, encountered sStopping state\n"; - forceLooperToEnd_ = true; - machine->process_event(statemachine::Stop()); - forceLooperToEnd_ = false; - break; - } - else if(state_ == sShuttingDown) { - FDEBUG(1) << "In main processing loop, encountered sShuttingDown state\n"; + if(checkForAsyncStopRequest(returnCode)) { forceLooperToEnd_ = true; machine->process_event(statemachine::Stop()); forceLooperToEnd_ = false; break; } - // Look for a shutdown signal - { - boost::mutex::scoped_lock sl(usr2_lock); - if(shutdown_flag) { - changeState(mShutdownSignal); - returnCode = epSignal; + //While all the following item type are isEvent, process them right here + if(itemType == InputSource::IsEvent and numberOfForkedChildren_ == 0) { + bool asyncStopRequested = false; + do { + readEvent(0); + processEvent(0); + if(shouldWeStop()) { + break; + } + itemType = input_->nextItemType(); + if((asyncStopRequested=checkForAsyncStopRequest(returnCode))) { + break; + } + } while (itemType == InputSource::IsEvent); + if(asyncStopRequested) { forceLooperToEnd_ = true; machine->process_event(statemachine::Stop()); forceLooperToEnd_ = false; @@ -1750,7 +1775,18 @@ namespace edm { } } - if(itemType == InputSource::IsStop) { + if(itemType == InputSource::IsEvent) { + if(numberOfForkedChildren_ > 0) { + machine->process_event(statemachine::Event()); + } else { + //We broke out of the loop early so check + // to see if we signaled for a stop + if(shouldWeStop()) { + machine->process_event(statemachine::Stop()); + } + } + } + else if(itemType == InputSource::IsStop) { machine->process_event(statemachine::Stop()); } else if(itemType == InputSource::IsFile) { @@ -1762,9 +1798,6 @@ namespace edm { else if(itemType == InputSource::IsLumi) { machine->process_event(statemachine::Lumi(input_->luminosityBlock())); } - else if(itemType == InputSource::IsEvent) { - machine->process_event(statemachine::Event()); - } else if(itemType == InputSource::IsSynchronize) { //For now, we don't have to do anything } @@ -2204,11 +2237,19 @@ namespace edm { } void EventProcessor::readAndProcessEvent() { + readEvent(0); + processEvent(0); + } + void EventProcessor::readEvent(unsigned int iStreamIndex) { //TODO this will have to become per stream - StreamContext streamContext(principalCache_.eventPrincipal(0).streamID(), &processContext_); - EventPrincipal *pep = input_->readEvent(principalCache_.eventPrincipal(0), &streamContext); + auto& event = principalCache_.eventPrincipal(iStreamIndex); + StreamContext streamContext(event.streamID(), &processContext_); + input_->readEvent(event, &streamContext); FDEBUG(1) << "\treadEvent\n"; - assert(pep != 0); + } + void EventProcessor::processEvent(unsigned int iStreamIndex) { + auto pep = &(principalCache_.eventPrincipal(iStreamIndex)); + pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr()); assert(pep->luminosityBlockPrincipalPtrValid()); assert(principalCache_.lumiPrincipalPtr()->run() == pep->run()); @@ -2220,7 +2261,7 @@ namespace edm { EventSetup const& es = esp_->eventSetup(); { typedef OccurrenceTraits Traits; - schedule_->processOneEvent(0,*pep, es); + schedule_->processOneEvent(iStreamIndex,*pep, es); if(hasSubProcess()) { subProcess_->doEvent(*pep); } From 15a4d3c59e663dcf30f357b44b4b00ab80e9ed59 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 29 Aug 2013 11:04:20 -0500 Subject: [PATCH 06/10] Moved continuous loop over events into method called from state machine Since the state machine makes certain useful guarantees when doing an Event transition, I moved the contiguous event loop into the method called by the state machine when an event in encountered. This will later change to be where the different Stream tasks are launched. --- FWCore/Framework/interface/EventProcessor.h | 5 ++ FWCore/Framework/src/EventProcessor.cc | 71 ++++++++++++--------- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/FWCore/Framework/interface/EventProcessor.h b/FWCore/Framework/interface/EventProcessor.h index 729f583b10b6e..2e9c5d5511eb1 100644 --- a/FWCore/Framework/interface/EventProcessor.h +++ b/FWCore/Framework/interface/EventProcessor.h @@ -14,6 +14,7 @@ configured in the user's main() function, and is set running. #include "FWCore/Framework/interface/Frameworkfwd.h" #include "FWCore/Framework/interface/IEventProcessor.h" +#include "FWCore/Framework/interface/InputSource.h" #include "FWCore/Framework/src/PrincipalCache.h" #include "FWCore/Framework/src/SignallingProductRegistry.h" #include "FWCore/Framework/src/PreallocationConfiguration.h" @@ -369,6 +370,10 @@ namespace edm { PreallocationConfiguration preallocations_; + bool asyncStopRequestedWhileProcessingEvents_; + InputSource::ItemType nextItemTypeFromProcessingEvents_; + StatusCode asyncStopStatusCodeFromProcessingEvents_; + typedef std::set > ExcludedData; typedef std::map ExcludedDataMap; ExcludedDataMap eventSetupDataToExcludeFromPrefetching_; diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 60b314174c8d4..8f4328699cbdb 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -445,7 +445,10 @@ namespace edm { numberOfForkedChildren_(0), numberOfSequentialEventsPerChild_(1), setCpuAffinity_(false), - eventSetupDataToExcludeFromPrefetching_() { + asyncStopRequestedWhileProcessingEvents_(false), + nextItemTypeFromProcessingEvents_(InputSource::IsEvent), + eventSetupDataToExcludeFromPrefetching_() + { boost::shared_ptr parameterSet = PythonProcessDesc(config).parameterSet(); boost::shared_ptr processDesc(new ProcessDesc(parameterSet)); processDesc->addServices(defaultServices, forcedServices); @@ -496,7 +499,10 @@ namespace edm { numberOfForkedChildren_(0), numberOfSequentialEventsPerChild_(1), setCpuAffinity_(false), - eventSetupDataToExcludeFromPrefetching_() { + asyncStopRequestedWhileProcessingEvents_(false), + nextItemTypeFromProcessingEvents_(InputSource::IsEvent), + eventSetupDataToExcludeFromPrefetching_() + { init(processDesc, token, legacy); } @@ -543,7 +549,10 @@ namespace edm { numberOfForkedChildren_(0), numberOfSequentialEventsPerChild_(1), setCpuAffinity_(false), - eventSetupDataToExcludeFromPrefetching_() { + asyncStopRequestedWhileProcessingEvents_(false), + nextItemTypeFromProcessingEvents_(InputSource::IsEvent), + eventSetupDataToExcludeFromPrefetching_() +{ if(isPython) { boost::shared_ptr parameterSet = PythonProcessDesc(config).parameterSet(); boost::shared_ptr processDesc(new ProcessDesc(parameterSet)); @@ -1711,6 +1720,7 @@ namespace edm { StateSentry toerror(this); StatusCode returnCode=epSuccess; + asyncStopStatusCodeFromProcessingEvents_=epSuccess; std::auto_ptr machine; { beginJob(); //make sure this was called @@ -1724,6 +1734,8 @@ namespace edm { ServiceRegistry::Operate operate(serviceToken_); machine = createStateMachine(); + nextItemTypeFromProcessingEvents_=InputSource::IsEvent; + asyncStopRequestedWhileProcessingEvents_=false; try { try { @@ -1753,38 +1765,19 @@ namespace edm { break; } - //While all the following item type are isEvent, process them right here - if(itemType == InputSource::IsEvent and numberOfForkedChildren_ == 0) { - bool asyncStopRequested = false; - do { - readEvent(0); - processEvent(0); - if(shouldWeStop()) { - break; - } - itemType = input_->nextItemType(); - if((asyncStopRequested=checkForAsyncStopRequest(returnCode))) { - break; - } - } while (itemType == InputSource::IsEvent); - if(asyncStopRequested) { + if(itemType == InputSource::IsEvent) { + machine->process_event(statemachine::Event()); + if(asyncStopRequestedWhileProcessingEvents_) { forceLooperToEnd_ = true; machine->process_event(statemachine::Stop()); forceLooperToEnd_ = false; + returnCode = asyncStopStatusCodeFromProcessingEvents_; break; } + itemType = nextItemTypeFromProcessingEvents_; } if(itemType == InputSource::IsEvent) { - if(numberOfForkedChildren_ > 0) { - machine->process_event(statemachine::Event()); - } else { - //We broke out of the loop early so check - // to see if we signaled for a stop - if(shouldWeStop()) { - machine->process_event(statemachine::Stop()); - } - } } else if(itemType == InputSource::IsStop) { machine->process_event(statemachine::Stop()); @@ -2237,8 +2230,28 @@ namespace edm { } void EventProcessor::readAndProcessEvent() { - readEvent(0); - processEvent(0); + if(numberOfForkedChildren_>0) { + readEvent(0); + processEvent(0); + return; + } + InputSource::ItemType itemType = InputSource::IsEvent; + + //While all the following item types are isEvent, process them right here + asyncStopRequestedWhileProcessingEvents_ = false; + do { + readEvent(0); + processEvent(0); + + if(shouldWeStop()) { + break; + } + itemType = input_->nextItemType(); + if((asyncStopRequestedWhileProcessingEvents_=checkForAsyncStopRequest(asyncStopStatusCodeFromProcessingEvents_))) { + break; + } + } while (itemType == InputSource::IsEvent); + nextItemTypeFromProcessingEvents_ = itemType; } void EventProcessor::readEvent(unsigned int iStreamIndex) { //TODO this will have to become per stream From 6a5b00872f5e2b5e1df46308cac7dc5dc533b218 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 29 Aug 2013 17:33:03 -0500 Subject: [PATCH 07/10] Percolated PreallocationConfiguration down to WorkerParams Stream modules and modules which have Run, Lumi or Stream caches needs to know how many 'slots' to reserve. This gets the information down to where modules are constructed. --- FWCore/Framework/interface/WorkerManager.h | 15 ++++--- FWCore/Framework/src/GlobalSchedule.cc | 3 +- FWCore/Framework/src/GlobalSchedule.h | 1 + FWCore/Framework/src/MakeModuleParams.h | 8 +++- FWCore/Framework/src/Schedule.cc | 20 ++++----- FWCore/Framework/src/StreamSchedule.cc | 42 ++++++++++--------- FWCore/Framework/src/StreamSchedule.h | 5 +++ FWCore/Framework/src/WorkerManager.cc | 18 ++++---- FWCore/Framework/src/WorkerParams.h | 10 +++-- FWCore/Framework/src/WorkerRegistry.cc | 2 +- .../edproducer_productregistry_callback.cc | 34 ++++++++------- FWCore/Framework/test/maker2_t.cppunit.cc | 6 ++- 12 files changed, 97 insertions(+), 67 deletions(-) diff --git a/FWCore/Framework/interface/WorkerManager.h b/FWCore/Framework/interface/WorkerManager.h index 287fa888bead1..288e14ffc37a4 100644 --- a/FWCore/Framework/interface/WorkerManager.h +++ b/FWCore/Framework/interface/WorkerManager.h @@ -24,6 +24,7 @@ namespace edm { class StreamID; class StreamContext; class ModuleRegistry; + class PreallocationConfiguration; class WorkerManager { public: @@ -35,12 +36,13 @@ namespace edm { boost::shared_ptr actReg, ExceptionToActionTable const& actions); void addToUnscheduledWorkers(ParameterSet& pset, - ProductRegistry& preg, - boost::shared_ptr processConfiguration, - std::string label, - bool useStopwatch, - std::set& unscheduledLabels, - std::vector& shouldBeUsedLabels); + ProductRegistry& preg, + PreallocationConfiguration const* prealloc, + boost::shared_ptr processConfiguration, + std::string label, + bool useStopwatch, + std::set& unscheduledLabels, + std::vector& shouldBeUsedLabels); void setOnDemandProducts(ProductRegistry& pregistry, std::set const& unscheduledLabels) const; @@ -67,6 +69,7 @@ namespace edm { Worker* getWorker(ParameterSet& pset, ProductRegistry& preg, + PreallocationConfiguration const* prealloc, boost::shared_ptr processConfiguration, std::string const& label); diff --git a/FWCore/Framework/src/GlobalSchedule.cc b/FWCore/Framework/src/GlobalSchedule.cc index ec48d4252e8a0..0c15507dab492 100644 --- a/FWCore/Framework/src/GlobalSchedule.cc +++ b/FWCore/Framework/src/GlobalSchedule.cc @@ -21,6 +21,7 @@ namespace edm { std::vector const& iModulesToUse, ParameterSet& proc_pset, ProductRegistry& pregistry, + PreallocationConfiguration const& prealloc, ExceptionToActionTable const& actions, boost::shared_ptr areg, boost::shared_ptr processConfiguration, @@ -40,7 +41,7 @@ namespace edm { assert(isTracked); //side effect keeps this module around - addToAllWorkers(workerManager_.getWorker(*modpset, pregistry, processConfiguration, moduleLabel)); + addToAllWorkers(workerManager_.getWorker(*modpset, pregistry, &prealloc,processConfiguration, moduleLabel)); } if(inserter) { diff --git a/FWCore/Framework/src/GlobalSchedule.h b/FWCore/Framework/src/GlobalSchedule.h index 35cc838da9d9b..c837726517b54 100644 --- a/FWCore/Framework/src/GlobalSchedule.h +++ b/FWCore/Framework/src/GlobalSchedule.h @@ -72,6 +72,7 @@ namespace edm { std::vector const& modulesToUse, ParameterSet& proc_pset, ProductRegistry& pregistry, + PreallocationConfiguration const& prealloc, ExceptionToActionTable const& actions, boost::shared_ptr areg, boost::shared_ptr processConfiguration, diff --git a/FWCore/Framework/src/MakeModuleParams.h b/FWCore/Framework/src/MakeModuleParams.h index bfc5f68ca1df2..e42aada5d3ea7 100644 --- a/FWCore/Framework/src/MakeModuleParams.h +++ b/FWCore/Framework/src/MakeModuleParams.h @@ -16,21 +16,25 @@ This struct is used to communication parameters into the module factory. namespace edm { class ProcessConfiguration; class ProductRegistry; + class PreallocationConfiguration; struct MakeModuleParams { MakeModuleParams() : - pset_(nullptr), reg_(nullptr), processConfiguration_() + pset_(nullptr), reg_(nullptr), preallocate_(nullptr), processConfiguration_() {} MakeModuleParams(ParameterSet* pset, ProductRegistry& reg, + PreallocationConfiguration const* prealloc, boost::shared_ptr processConfiguration) : - pset_(pset), + pset_(pset), reg_(®), + preallocate_(prealloc), processConfiguration_(processConfiguration) {} ParameterSet* pset_; ProductRegistry* reg_; + PreallocationConfiguration const* preallocate_; boost::shared_ptr processConfiguration_; }; } diff --git a/FWCore/Framework/src/Schedule.cc b/FWCore/Framework/src/Schedule.cc index 47f68407f4a76..19edff2b7d0b9 100644 --- a/FWCore/Framework/src/Schedule.cc +++ b/FWCore/Framework/src/Schedule.cc @@ -45,7 +45,7 @@ namespace edm { std::shared_ptr makeInserter(ParameterSet& proc_pset, - unsigned int iNStreams, + PreallocationConfiguration const& iPrealloc, ProductRegistry& preg, ExceptionToActionTable const& actions, boost::shared_ptr areg, @@ -54,7 +54,7 @@ namespace edm { ParameterSet* trig_pset = proc_pset.getPSetForUpdate("@trigger_paths"); trig_pset->registerIt(); - WorkerParams work_args(trig_pset, preg, processConfiguration, actions); + WorkerParams work_args(trig_pset, preg, &iPrealloc, processConfiguration, actions); ModuleDescription md(trig_pset->id(), "TriggerResultInserter", "TriggerResults", @@ -62,7 +62,7 @@ namespace edm { ModuleDescription::getUniqueID()); areg->preModuleConstructionSignal_(md); - maker::ModuleHolderT holder(new TriggerResultInserter(*trig_pset, iNStreams),static_cast(nullptr)); + maker::ModuleHolderT holder(new TriggerResultInserter(*trig_pset, iPrealloc.numberOfStreams()),static_cast(nullptr)); holder.setModuleDescription(md); holder.registerProductsAndCallbacks(&preg); areg->postModuleConstructionSignal_(md); @@ -338,19 +338,19 @@ namespace edm { boost::shared_ptr areg, boost::shared_ptr processConfiguration, const ParameterSet* subProcPSet, - PreallocationConfiguration const& config, + PreallocationConfiguration const& prealloc, ProcessContext const* processContext) : //Only create a resultsInserter if there is a trigger path - resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr{} :makeInserter(proc_pset,config.numberOfStreams(),preg,actions,areg,processConfiguration)}, + resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)}, moduleRegistry_(new ModuleRegistry()), all_output_communicators_(), wantSummary_(tns.wantSummary()), endpathsAreActive_(true) { - assert(0{new StreamSchedule{resultsInserter_.get(),moduleRegistry_,proc_pset,tns,preg,branchIDListHelper,actions,areg,processConfiguration,nullptr==subProcPSet,StreamID{i},processContext}}); + assert(0{new StreamSchedule{resultsInserter_.get(),moduleRegistry_,proc_pset,tns,prealloc, preg,branchIDListHelper,actions,areg,processConfiguration,nullptr==subProcPSet,StreamID{i},processContext}}); } //TriggerResults are injected automatically by StreamSchedules and are @@ -377,7 +377,7 @@ namespace edm { globalSchedule_.reset( new GlobalSchedule{ resultsInserter_.get(), moduleRegistry_, modulesToUse, - proc_pset, preg, + proc_pset, preg, prealloc, actions,areg,processConfiguration,processContext }); //TriggerResults is not in the top level ParameterSet so the call to diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index 624736f6c78cc..5a415e512ff4c 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -141,6 +141,7 @@ namespace edm { boost::shared_ptr modReg, ParameterSet& proc_pset, service::TriggerNamesService& tns, + PreallocationConfiguration const& prealloc, ProductRegistry& preg, BranchIDListHelper& branchIDListHelper, ExceptionToActionTable const& actions, @@ -173,7 +174,7 @@ namespace edm { trig_paths_.reserve(trig_name_list_.size()); vstring labelsOnTriggerPaths; for (auto const& trig_name : trig_name_list_) { - fillTrigPath(proc_pset, preg, processConfiguration, trig_bitpos, trig_name, results_, &labelsOnTriggerPaths); + fillTrigPath(proc_pset, preg, &prealloc, processConfiguration, trig_bitpos, trig_name, results_, &labelsOnTriggerPaths); ++trig_bitpos; hasPath = true; } @@ -190,7 +191,7 @@ namespace edm { int bitpos = 0; end_paths_.reserve(end_path_name_list_.size()); for (auto const& end_path_name : end_path_name_list_) { - fillEndPath(proc_pset, preg, processConfiguration, bitpos, end_path_name); + fillEndPath(proc_pset, preg, &prealloc, processConfiguration, bitpos, end_path_name); ++bitpos; } @@ -220,7 +221,7 @@ namespace edm { ParameterSet* modulePSet(proc_pset.getPSetForUpdate(label, isTracked)); assert(isTracked); assert(modulePSet != nullptr); - workerManager_.addToUnscheduledWorkers(*modulePSet, preg, processConfiguration, label, wantSummary_, unscheduledLabels, shouldBeUsedLabels); + workerManager_.addToUnscheduledWorkers(*modulePSet, preg, &prealloc, processConfiguration, label, wantSummary_, unscheduledLabels, shouldBeUsedLabels); } else { //everthing is marked are unused so no 'on demand' allowed shouldBeUsedLabels.push_back(label); @@ -408,12 +409,13 @@ namespace edm { } void StreamSchedule::fillWorkers(ParameterSet& proc_pset, - ProductRegistry& preg, - boost::shared_ptr processConfiguration, - std::string const& name, - bool ignoreFilters, - PathWorkers& out, - vstring* labelsOnPaths) { + ProductRegistry& preg, + PreallocationConfiguration const* prealloc, + boost::shared_ptr processConfiguration, + std::string const& name, + bool ignoreFilters, + PathWorkers& out, + vstring* labelsOnPaths) { vstring modnames = proc_pset.getParameter(name); PathWorkers tmpworkers; @@ -443,7 +445,7 @@ namespace edm { } assert(isTracked); - Worker* worker = workerManager_.getWorker(*modpset, preg, processConfiguration, moduleLabel); + Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel); if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType()==Worker::kFilter) { // We have a filter on an end path, and the filter is not explicitly ignored. // See if the filter is allowed. @@ -466,13 +468,14 @@ namespace edm { } void StreamSchedule::fillTrigPath(ParameterSet& proc_pset, - ProductRegistry& preg, - boost::shared_ptr processConfiguration, - int bitpos, std::string const& name, TrigResPtr trptr, - vstring* labelsOnTriggerPaths) { + ProductRegistry& preg, + PreallocationConfiguration const* prealloc, + boost::shared_ptr processConfiguration, + int bitpos, std::string const& name, TrigResPtr trptr, + vstring* labelsOnTriggerPaths) { PathWorkers tmpworkers; Workers holder; - fillWorkers(proc_pset, preg, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths); + fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, false, tmpworkers, labelsOnTriggerPaths); for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) { @@ -493,11 +496,12 @@ namespace edm { } void StreamSchedule::fillEndPath(ParameterSet& proc_pset, - ProductRegistry& preg, - boost::shared_ptr processConfiguration, - int bitpos, std::string const& name) { + ProductRegistry& preg, + PreallocationConfiguration const* prealloc, + boost::shared_ptr processConfiguration, + int bitpos, std::string const& name) { PathWorkers tmpworkers; - fillWorkers(proc_pset, preg, processConfiguration, name, true, tmpworkers, 0); + fillWorkers(proc_pset, preg, prealloc, processConfiguration, name, true, tmpworkers, 0); Workers holder; for (PathWorkers::iterator wi(tmpworkers.begin()), we(tmpworkers.end()); wi != we; ++wi) { diff --git a/FWCore/Framework/src/StreamSchedule.h b/FWCore/Framework/src/StreamSchedule.h index 0d2ac04d081f6..c222ec04d2cca 100644 --- a/FWCore/Framework/src/StreamSchedule.h +++ b/FWCore/Framework/src/StreamSchedule.h @@ -106,6 +106,7 @@ namespace edm { class TriggerTimingReport; class ModuleRegistry; class TriggerResultInserter; + class PreallocationConfiguration; namespace service { class TriggerNamesService; @@ -150,6 +151,7 @@ namespace edm { boost::shared_ptr, ParameterSet& proc_pset, service::TriggerNamesService& tns, + PreallocationConfiguration const& prealloc, ProductRegistry& pregistry, BranchIDListHelper& branchIDListHelper, ExceptionToActionTable const& actions, @@ -264,16 +266,19 @@ namespace edm { void fillWorkers(ParameterSet& proc_pset, ProductRegistry& preg, + PreallocationConfiguration const* prealloc, boost::shared_ptr processConfiguration, std::string const& name, bool ignoreFilters, PathWorkers& out, vstring* labelsOnPaths); void fillTrigPath(ParameterSet& proc_pset, ProductRegistry& preg, + PreallocationConfiguration const* prealloc, boost::shared_ptr processConfiguration, int bitpos, std::string const& name, TrigResPtr, vstring* labelsOnTriggerPaths); void fillEndPath(ParameterSet& proc_pset, ProductRegistry& preg, + PreallocationConfiguration const* prealloc, boost::shared_ptr processConfiguration, int bitpos, std::string const& name); diff --git a/FWCore/Framework/src/WorkerManager.cc b/FWCore/Framework/src/WorkerManager.cc index cad039abd6a54..0fc0b015b95b3 100644 --- a/FWCore/Framework/src/WorkerManager.cc +++ b/FWCore/Framework/src/WorkerManager.cc @@ -27,23 +27,25 @@ namespace edm { Worker* WorkerManager::getWorker(ParameterSet& pset, ProductRegistry& preg, + PreallocationConfiguration const* prealloc, boost::shared_ptr processConfiguration, std::string const & label) { - WorkerParams params(&pset, preg, processConfiguration, *actionTable_); + WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_); return workerReg_.getWorker(params, label); } void WorkerManager::addToUnscheduledWorkers(ParameterSet& pset, - ProductRegistry& preg, - boost::shared_ptr processConfiguration, - std::string label, - bool useStopwatch, - std::set& unscheduledLabels, - std::vector& shouldBeUsedLabels) { + ProductRegistry& preg, + PreallocationConfiguration const* prealloc, + boost::shared_ptr processConfiguration, + std::string label, + bool useStopwatch, + std::set& unscheduledLabels, + std::vector& shouldBeUsedLabels) { //Need to // 1) create worker // 2) if it is a WorkerT, add it to our list - Worker* newWorker = getWorker(pset, preg, processConfiguration, label); + Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label); if(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter) { unscheduledLabels.insert(label); unscheduled_->addWorker(newWorker); diff --git a/FWCore/Framework/src/WorkerParams.h b/FWCore/Framework/src/WorkerParams.h index 662a5f0177c1a..bd2e0c97d44c3 100644 --- a/FWCore/Framework/src/WorkerParams.h +++ b/FWCore/Framework/src/WorkerParams.h @@ -1,5 +1,5 @@ -#ifndef Framework_WorkerParams_h -#define Framework_WorkerParams_h +#ifndef FWCore_Framework_WorkerParams_h +#define FWCore_Framework_WorkerParams_h /** ---------------------- @@ -17,23 +17,27 @@ namespace edm { class ProcessConfiguration; class ProductRegistry; class ExceptionToActionTable; + class PreallocationConfiguration; struct WorkerParams { WorkerParams() : - pset_(nullptr), reg_(nullptr), processConfiguration_(), actions_(nullptr) + pset_(nullptr), reg_(nullptr), preallocate_(nullptr),processConfiguration_(), actions_(nullptr) {} WorkerParams(ParameterSet* pset, ProductRegistry& reg, + PreallocationConfiguration const* prealloc, boost::shared_ptr processConfiguration, ExceptionToActionTable const& actions) : pset_(pset), reg_(®), + preallocate_(prealloc), processConfiguration_(processConfiguration), actions_(&actions) {} ParameterSet* pset_; ProductRegistry* reg_; + PreallocationConfiguration const* preallocate_; boost::shared_ptr processConfiguration_; ExceptionToActionTable const* actions_; }; diff --git a/FWCore/Framework/src/WorkerRegistry.cc b/FWCore/Framework/src/WorkerRegistry.cc index a7502a6c3e8b8..ae339273b1f3e 100644 --- a/FWCore/Framework/src/WorkerRegistry.cc +++ b/FWCore/Framework/src/WorkerRegistry.cc @@ -43,7 +43,7 @@ namespace edm { // if the worker is not there, make it if (workerIt == m_workerMap.end()){ - MakeModuleParams mmp(p.pset_,*p.reg_,p.processConfiguration_); + MakeModuleParams mmp(p.pset_,*p.reg_,p.preallocate_,p.processConfiguration_); auto modulePtr = modRegistry_->getModule(mmp,moduleLabel, actReg_->preModuleConstructionSignal_, actReg_->postModuleConstructionSignal_); diff --git a/FWCore/Framework/test/edproducer_productregistry_callback.cc b/FWCore/Framework/test/edproducer_productregistry_callback.cc index 2759ba5722f55..04999949799c2 100644 --- a/FWCore/Framework/test/edproducer_productregistry_callback.cc +++ b/FWCore/Framework/test/edproducer_productregistry_callback.cc @@ -15,6 +15,7 @@ #include "FWCore/Framework/src/SignallingProductRegistry.h" #include "FWCore/Framework/interface/ConstProductRegistry.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/Framework/interface/EDProducer.h" #include "FWCore/Framework/interface/ExceptionActions.h" @@ -138,13 +139,14 @@ void testEDProducerProductRegistryCallback::testCircularRef() { p2.registerIt(); edm::ExceptionToActionTable table; - + edm::PreallocationConfiguration prealloc; + edm::ParameterSet dummyProcessPset; dummyProcessPset.registerIt(); boost::shared_ptr pc(new ProcessConfiguration("PROD", dummyProcessPset.id(), edm::getReleaseVersion(), edm::getPassID())); - edm::MakeModuleParams params1(&p1, preg, pc); - edm::MakeModuleParams params2(&p2, preg, pc); + edm::MakeModuleParams params1(&p1, preg, &prealloc, pc); + edm::MakeModuleParams params2(&p2, preg, &prealloc, pc); std::auto_ptr lM(new WorkerMaker); ParameterSet l1; @@ -159,8 +161,8 @@ void testEDProducerProductRegistryCallback::testCircularRef() { l2.addParameter("@module_edm_type",std::string("EDProducer") ); l2.registerIt(); - edm::MakeModuleParams paramsl1(&l1, preg, pc); - edm::MakeModuleParams paramsl2(&l2, preg, pc); + edm::MakeModuleParams paramsl1(&l1, preg, &prealloc, pc); + edm::MakeModuleParams paramsl2(&l2, preg, &prealloc, pc); signalslot::Signal aSignal; @@ -213,13 +215,14 @@ void testEDProducerProductRegistryCallback::testCircularRef2() { p2.registerIt(); edm::ExceptionToActionTable table; - + edm::PreallocationConfiguration prealloc; + edm::ParameterSet dummyProcessPset; dummyProcessPset.registerIt(); boost::shared_ptr pc(new ProcessConfiguration("PROD", dummyProcessPset.id(), edm::getReleaseVersion(), edm::getPassID())); - edm::MakeModuleParams params1(&p1, preg, pc); - edm::MakeModuleParams params2(&p2, preg, pc); + edm::MakeModuleParams params1(&p1, preg, &prealloc, pc); + edm::MakeModuleParams params2(&p2, preg, &prealloc, pc); std::auto_ptr lM(new WorkerMaker); ParameterSet l1; @@ -234,8 +237,8 @@ void testEDProducerProductRegistryCallback::testCircularRef2() { l2.addParameter("@module_edm_type",std::string("EDProducer") ); l2.registerIt(); - edm::MakeModuleParams paramsl1(&l1, preg, pc); - edm::MakeModuleParams paramsl2(&l2, preg, pc); + edm::MakeModuleParams paramsl1(&l1, preg, &prealloc, pc); + edm::MakeModuleParams paramsl2(&l2, preg, &prealloc, pc); signalslot::Signal aSignal; auto ml1 = lM->makeModule(paramsl1,aSignal,aSignal); @@ -287,13 +290,14 @@ void testEDProducerProductRegistryCallback::testTwoListeners(){ p2.registerIt(); edm::ExceptionToActionTable table; - + edm::PreallocationConfiguration prealloc; + edm::ParameterSet dummyProcessPset; dummyProcessPset.registerIt(); boost::shared_ptr pc(new ProcessConfiguration("PROD", dummyProcessPset.id(), edm::getReleaseVersion(), edm::getPassID())); - edm::MakeModuleParams params1(&p1, preg, pc); - edm::MakeModuleParams params2(&p2, preg, pc); + edm::MakeModuleParams params1(&p1, preg, &prealloc, pc); + edm::MakeModuleParams params2(&p2, preg, &prealloc, pc); std::auto_ptr lM(new WorkerMaker); ParameterSet l1; @@ -309,8 +313,8 @@ void testEDProducerProductRegistryCallback::testTwoListeners(){ l2.addParameter("@module_edm_type",std::string("EDProducer") ); l2.registerIt(); - edm::MakeModuleParams paramsl1(&l1, preg, pc); - edm::MakeModuleParams paramsl2(&l2, preg, pc); + edm::MakeModuleParams paramsl1(&l1, preg, &prealloc, pc); + edm::MakeModuleParams paramsl2(&l2, preg, &prealloc, pc); signalslot::Signal aSignal; diff --git a/FWCore/Framework/test/maker2_t.cppunit.cc b/FWCore/Framework/test/maker2_t.cppunit.cc index 8975fd1bc77a3..04b7c8cb99b57 100644 --- a/FWCore/Framework/test/maker2_t.cppunit.cc +++ b/FWCore/Framework/test/maker2_t.cppunit.cc @@ -8,6 +8,7 @@ #include "FWCore/Framework/interface/Frameworkfwd.h" #include "FWCore/Framework/interface/EDProducer.h" #include "FWCore/Framework/src/WorkerT.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/Framework/interface/ExceptionActions.h" #include "DataFormats/Provenance/interface/ProductRegistry.h" #include "FWCore/Framework/src/WorkerMaker.h" @@ -69,9 +70,10 @@ void testmaker2::maker2Test() edm::ExceptionToActionTable table; edm::ProductRegistry preg; + edm::PreallocationConfiguration prealloc; boost::shared_ptr pc(new ProcessConfiguration("PROD", edm::ParameterSetID(), edm::getReleaseVersion(), edm::getPassID())); - edm::MakeModuleParams params1(&p1, preg, pc); - edm::MakeModuleParams params2(&p2, preg, pc); + edm::MakeModuleParams params1(&p1, preg, &prealloc, pc); + edm::MakeModuleParams params2(&p2, preg, &prealloc, pc); signalslot::Signal aSignal; auto m1 = f->makeModule(params1,aSignal,aSignal); From a4953854fdcd45c47e87261f2bca0d59d0071dfa Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 30 Aug 2013 17:19:44 -0500 Subject: [PATCH 08/10] Properly preallocate all per module per stream items Percolated the PreallocationConfiguration into the module base classes and passed the number of streams to the appropriate virtual functions which are overwritten in the appropriate inheriting classes. The per Run and LuminosityBlock items are still just single items since having multiple such transitions is a later phase of the project. --- FWCore/Framework/interface/EDAnalyzer.h | 2 + FWCore/Framework/interface/EDFilter.h | 2 + FWCore/Framework/interface/EDProducer.h | 3 ++ FWCore/Framework/interface/OutputModule.h | 3 ++ FWCore/Framework/interface/Schedule.h | 2 + .../interface/global/EDAnalyzerBase.h | 3 ++ .../Framework/interface/global/EDFilterBase.h | 3 ++ .../interface/global/EDProducerBase.h | 3 ++ .../Framework/interface/global/implementors.h | 17 +++++-- .../Framework/interface/one/EDAnalyzerBase.h | 2 + FWCore/Framework/interface/one/EDFilterBase.h | 4 +- .../Framework/interface/one/EDProducerBase.h | 2 + .../interface/one/OutputModuleBase.h | 5 +- .../interface/stream/EDAnalyzerAdaptor.h | 10 +++- .../interface/stream/EDAnalyzerAdaptorBase.h | 8 ++- .../interface/stream/ProducingModuleAdaptor.h | 10 +++- .../stream/ProducingModuleAdaptorBase.h | 9 ++-- FWCore/Framework/src/ModuleHolder.h | 6 +++ FWCore/Framework/src/ModuleRegistry.cc | 6 ++- FWCore/Framework/src/ModuleRegistry.h | 4 +- FWCore/Framework/src/Schedule.cc | 3 +- FWCore/Framework/src/WorkerMaker.cc | 1 + FWCore/Framework/src/global/EDAnalyzerBase.cc | 8 +++ FWCore/Framework/src/global/EDFilterBase.cc | 7 +++ FWCore/Framework/src/global/EDProducerBase.cc | 7 +++ .../src/stream/EDAnalyzerAdaptorBase.cc | 9 +++- .../src/stream/ProducingModuleAdaptorBase.cc | 12 ++++- .../Framework/test/global_module_t.cppunit.cc | 51 ++++++++++++++----- .../Framework/test/stream_module_t.cppunit.cc | 8 ++- 29 files changed, 175 insertions(+), 35 deletions(-) diff --git a/FWCore/Framework/interface/EDAnalyzer.h b/FWCore/Framework/interface/EDAnalyzer.h index bb01b10b5d449..4721777a62f65 100644 --- a/FWCore/Framework/interface/EDAnalyzer.h +++ b/FWCore/Framework/interface/EDAnalyzer.h @@ -13,6 +13,7 @@ namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; namespace maker { template class ModuleHolderT; @@ -41,6 +42,7 @@ namespace edm { private: bool doEvent(EventPrincipal const& ep, EventSetup const& c, ModuleCallingContext const* mcc); + void doPreallocate(PreallocationConfiguration const&) {} void doBeginJob(); void doEndJob(); bool doBeginRun(RunPrincipal const& rp, EventSetup const& c, diff --git a/FWCore/Framework/interface/EDFilter.h b/FWCore/Framework/interface/EDFilter.h index 23809e64688e3..4159119270556 100644 --- a/FWCore/Framework/interface/EDFilter.h +++ b/FWCore/Framework/interface/EDFilter.h @@ -26,6 +26,7 @@ namespace edm { } class ModuleCallingContext; + class PreallocationConfiguration; class EDFilter : public ProducerBase, public EDConsumerBase { public: @@ -49,6 +50,7 @@ namespace edm { private: bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const* mcc); + void doPreallocate(PreallocationConfiguration const&) {} void doBeginJob(); void doEndJob(); void doBeginRun(RunPrincipal& rp, EventSetup const& c, diff --git a/FWCore/Framework/interface/EDProducer.h b/FWCore/Framework/interface/EDProducer.h index 96dea9d6b2d45..27b016b705d2d 100644 --- a/FWCore/Framework/interface/EDProducer.h +++ b/FWCore/Framework/interface/EDProducer.h @@ -21,6 +21,8 @@ EDProducts into an Event. namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; + namespace maker { template class ModuleHolderT; } @@ -44,6 +46,7 @@ namespace edm { private: bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const* mcc); + void doPreallocate(PreallocationConfiguration const&) {} void doBeginJob(); void doEndJob(); void doBeginRun(RunPrincipal& rp, EventSetup const& c, diff --git a/FWCore/Framework/interface/OutputModule.h b/FWCore/Framework/interface/OutputModule.h index d08f39c130fd1..d48808fafc0a0 100644 --- a/FWCore/Framework/interface/OutputModule.h +++ b/FWCore/Framework/interface/OutputModule.h @@ -31,6 +31,7 @@ output stream. namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; namespace maker { template class ModuleHolderT; @@ -91,6 +92,8 @@ namespace edm { ParameterSetID selectorConfig() const { return selector_config_id_; } + void doPreallocate(PreallocationConfiguration const&) {} + void doBeginJob(); void doEndJob(); bool doEvent(EventPrincipal const& ep, EventSetup const& c, diff --git a/FWCore/Framework/interface/Schedule.h b/FWCore/Framework/interface/Schedule.h index 0c12fbd0c14a6..ea97d9b4c0ce2 100644 --- a/FWCore/Framework/interface/Schedule.h +++ b/FWCore/Framework/interface/Schedule.h @@ -68,6 +68,7 @@ #include "FWCore/Framework/src/WorkerRegistry.h" #include "FWCore/Framework/src/GlobalSchedule.h" #include "FWCore/Framework/src/StreamSchedule.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/MessageLogger/interface/ExceptionMessages.h" #include "FWCore/MessageLogger/interface/JobReport.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" @@ -243,6 +244,7 @@ namespace edm { std::unique_ptr globalSchedule_; AllOutputModuleCommunicators all_output_communicators_; + PreallocationConfiguration preallocConfig_; bool wantSummary_; diff --git a/FWCore/Framework/interface/global/EDAnalyzerBase.h b/FWCore/Framework/interface/global/EDAnalyzerBase.h index bf5498bdf558f..645ca92e536c4 100644 --- a/FWCore/Framework/interface/global/EDAnalyzerBase.h +++ b/FWCore/Framework/interface/global/EDAnalyzerBase.h @@ -30,6 +30,7 @@ namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; class StreamID; namespace maker { @@ -59,6 +60,7 @@ namespace edm { private: bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const*); + void doPreallocate(PreallocationConfiguration const&); void doBeginJob(); void doEndJob(); @@ -105,6 +107,7 @@ namespace edm { virtual void beginJob() {} virtual void endJob(){} + virtual void preallocStreams(unsigned int); virtual void doBeginStream_(StreamID id); virtual void doEndStream_(StreamID id); virtual void doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c); diff --git a/FWCore/Framework/interface/global/EDFilterBase.h b/FWCore/Framework/interface/global/EDFilterBase.h index c80a961cb866e..c939c813f97b7 100644 --- a/FWCore/Framework/interface/global/EDFilterBase.h +++ b/FWCore/Framework/interface/global/EDFilterBase.h @@ -31,6 +31,7 @@ namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; class StreamID; namespace maker { @@ -60,6 +61,7 @@ namespace edm { private: bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const*); + void doPreallocate(PreallocationConfiguration const&); void doBeginJob(); void doEndJob(); @@ -108,6 +110,7 @@ namespace edm { virtual void beginJob() {} virtual void endJob(){} + virtual void preallocStreams(unsigned int); virtual void doBeginStream_(StreamID id); virtual void doEndStream_(StreamID id); virtual void doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c); diff --git a/FWCore/Framework/interface/global/EDProducerBase.h b/FWCore/Framework/interface/global/EDProducerBase.h index 2c82380243026..c37896284302a 100644 --- a/FWCore/Framework/interface/global/EDProducerBase.h +++ b/FWCore/Framework/interface/global/EDProducerBase.h @@ -31,6 +31,7 @@ namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; class StreamID; namespace maker { @@ -60,6 +61,7 @@ namespace edm { private: bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const*); + void doPreallocate(PreallocationConfiguration const&); void doBeginJob(); void doEndJob(); @@ -108,6 +110,7 @@ namespace edm { virtual void beginJob() {} virtual void endJob(){} + virtual void preallocStreams(unsigned int); virtual void doBeginStream_(StreamID id); virtual void doEndStream_(StreamID id); virtual void doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c); diff --git a/FWCore/Framework/interface/global/implementors.h b/FWCore/Framework/interface/global/implementors.h index d42a6d5fe670a..2be23c954ea19 100644 --- a/FWCore/Framework/interface/global/implementors.h +++ b/FWCore/Framework/interface/global/implementors.h @@ -41,15 +41,24 @@ namespace edm { StreamCacheHolder() = default; StreamCacheHolder( StreamCacheHolder const&) = delete; StreamCacheHolder& operator=(StreamCacheHolder const&) = delete; + ~StreamCacheHolder() { + for(auto c: caches_){ + delete c; + } + } protected: - T * streamCache(edm::StreamID iID) const { return cache_.get(); } + T * streamCache(edm::StreamID iID) const { return caches_[iID.value()]; } private: + virtual void preallocStreams(unsigned int iNStreams) override final { + caches_.resize(iNStreams,static_cast(nullptr)); + } virtual void doBeginStream_(StreamID id) override final { - cache_ = beginStream(id); + caches_[id.value()] = beginStream(id).release(); } virtual void doEndStream_(StreamID id) override final { endStream(id); - cache_.reset(); + delete caches_[id.value()]; + caches_[id.value()]=nullptr; } virtual void doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c) override final { streamBeginRun(id,rp,c); @@ -72,7 +81,7 @@ namespace edm { virtual void endStream(edm::StreamID) const {} //When threaded we will have a container for N items whre N is # of streams - std::unique_ptr cache_; + std::vector caches_; }; template diff --git a/FWCore/Framework/interface/one/EDAnalyzerBase.h b/FWCore/Framework/interface/one/EDAnalyzerBase.h index 209c7ce59ffb9..4e47fd0c1d969 100644 --- a/FWCore/Framework/interface/one/EDAnalyzerBase.h +++ b/FWCore/Framework/interface/one/EDAnalyzerBase.h @@ -30,6 +30,7 @@ namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; namespace maker { template class ModuleHolderT; @@ -64,6 +65,7 @@ namespace edm { private: bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const*); + void doPreallocate(PreallocationConfiguration const&) {} void doBeginJob(); void doEndJob(); diff --git a/FWCore/Framework/interface/one/EDFilterBase.h b/FWCore/Framework/interface/one/EDFilterBase.h index c2a6b88b42f86..3244e1173f1a2 100644 --- a/FWCore/Framework/interface/one/EDFilterBase.h +++ b/FWCore/Framework/interface/one/EDFilterBase.h @@ -31,7 +31,8 @@ namespace edm { class ModuleCallingContext; - + class PreallocationConfiguration; + namespace maker { template class ModuleHolderT; } @@ -60,6 +61,7 @@ namespace edm { private: bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const*); + void doPreallocate(PreallocationConfiguration const&) {} void doBeginJob(); void doEndJob(); diff --git a/FWCore/Framework/interface/one/EDProducerBase.h b/FWCore/Framework/interface/one/EDProducerBase.h index fb2b7aefe9010..058144bc7ff7d 100644 --- a/FWCore/Framework/interface/one/EDProducerBase.h +++ b/FWCore/Framework/interface/one/EDProducerBase.h @@ -31,6 +31,7 @@ namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; namespace maker { template class ModuleHolderT; } @@ -59,6 +60,7 @@ namespace edm { private: bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const*); + void doPreallocate(PreallocationConfiguration const&) {} void doBeginJob(); void doEndJob(); diff --git a/FWCore/Framework/interface/one/OutputModuleBase.h b/FWCore/Framework/interface/one/OutputModuleBase.h index 4c0ca48c970a2..3c74da5d4a97a 100644 --- a/FWCore/Framework/interface/one/OutputModuleBase.h +++ b/FWCore/Framework/interface/one/OutputModuleBase.h @@ -45,6 +45,7 @@ namespace edm { class ModuleCallingContext; + class PreallocationConfiguration; template class OutputModuleCommunicatorT; namespace maker { @@ -104,7 +105,9 @@ namespace edm { ModuleDescription const& description() const; ParameterSetID selectorConfig() const { return selector_config_id_; } - + + void doPreallocate(PreallocationConfiguration const&) {} + void doBeginJob(); void doEndJob(); bool doEvent(EventPrincipal const& ep, EventSetup const& c, diff --git a/FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h b/FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h index fe8cc99df716d..e5ba3b71941bb 100644 --- a/FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h +++ b/FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h @@ -70,7 +70,8 @@ namespace edm { { public: - EDAnalyzerAdaptor( edm::ParameterSet const& iPSet) + EDAnalyzerAdaptor( edm::ParameterSet const& iPSet): + m_pset(&iPSet) { m_runs.resize(1); m_lumis.resize(1); @@ -78,7 +79,6 @@ namespace edm { m_lumiSummaries.resize(1); typename T::GlobalCache const* dummy=nullptr; m_global.reset( impl::makeGlobal(iPSet,dummy).release()); - this->createStreamModules([this,&iPSet] () -> EDAnalyzerBase* {return impl::makeStreamModule(iPSet,m_global.get());}); } ~EDAnalyzerAdaptor() { } @@ -98,6 +98,11 @@ namespace edm { typedef CallGlobalLuminosityBlock MyGlobalLuminosityBlock; typedef CallGlobalLuminosityBlockSummary MyGlobalLuminosityBlockSummary; + void setupStreamModules() override final { + this->createStreamModules([this] () -> EDAnalyzerBase* {return impl::makeStreamModule(*m_pset,m_global.get());}); + m_pset= nullptr; + } + void doEndJob() override final { MyGlobal::endJob(m_global.get()); } @@ -190,6 +195,7 @@ namespace edm { typename impl::choose_shared_vec::type m_lumis; typename impl::choose_shared_vec::type m_runSummaries; typename impl::choose_shared_vec::type m_lumiSummaries; + ParameterSet const* m_pset; }; } diff --git a/FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h b/FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h index d8c187b67eae3..bdd164b152838 100644 --- a/FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h +++ b/FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h @@ -37,6 +37,7 @@ namespace edm { class ModuleCallingContext; class ProductHolderIndexHelper; class EDConsumerBase; + class PreallocationConfiguration; namespace maker { template class ModuleHolderT; @@ -66,7 +67,9 @@ namespace edm { registerProductsAndCallbacks(EDAnalyzerAdaptorBase const*, ProductRegistry* reg); protected: template void createStreamModules(T iFunc) { - m_streamModules[0] = iFunc(); + for(auto& m: m_streamModules) { + m = iFunc(); + } } //Same interface as EDConsumerBase @@ -83,6 +86,9 @@ namespace edm { bool doEvent(EventPrincipal& ep, EventSetup const& c, ModuleCallingContext const*) ; + void doPreallocate(PreallocationConfiguration const&); + + virtual void setupStreamModules() = 0; void doBeginJob(); virtual void doEndJob() = 0; diff --git a/FWCore/Framework/interface/stream/ProducingModuleAdaptor.h b/FWCore/Framework/interface/stream/ProducingModuleAdaptor.h index 5f6c6e8980c07..253144b292bbd 100644 --- a/FWCore/Framework/interface/stream/ProducingModuleAdaptor.h +++ b/FWCore/Framework/interface/stream/ProducingModuleAdaptor.h @@ -59,7 +59,8 @@ namespace edm { { public: - ProducingModuleAdaptor( edm::ParameterSet const& iPSet) + ProducingModuleAdaptor( edm::ParameterSet const& iPSet): + m_pset(&iPSet) { m_runs.resize(1); m_lumis.resize(1); @@ -67,7 +68,6 @@ namespace edm { m_lumiSummaries.resize(1); typename T::GlobalCache const* dummy=nullptr; m_global.reset( impl::makeGlobal(iPSet,dummy).release()); - this->createStreamModules([this,&iPSet] () -> M* {return impl::makeStreamModule(iPSet,m_global.get());}); } ~ProducingModuleAdaptor() { } @@ -91,6 +91,11 @@ namespace edm { typedef CallBeginLuminosityBlockProduce MyBeginLuminosityBlockProduce; typedef CallEndLuminosityBlockProduce MyEndLuminosityBlockProduce; + void setupStreamModules() override final { + this->createStreamModules([this] () -> M* {return impl::makeStreamModule(*m_pset,m_global.get());}); + m_pset= nullptr; + } + void doEndJob() override final { MyGlobal::endJob(m_global.get()); } @@ -203,6 +208,7 @@ namespace edm { typename impl::choose_shared_vec::type m_lumis; typename impl::choose_shared_vec::type m_runSummaries; typename impl::choose_shared_vec::type m_lumiSummaries; + ParameterSet const* m_pset; }; } } diff --git a/FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h b/FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h index e2c01b98c3644..27678d8a1efdd 100644 --- a/FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h +++ b/FWCore/Framework/interface/stream/ProducingModuleAdaptorBase.h @@ -40,6 +40,7 @@ namespace edm { class ModuleCallingContext; class ProductHolderIndexHelper; class EDConsumerBase; + class PreallocationConfiguration; namespace maker { template class ModuleHolderT; @@ -75,7 +76,9 @@ namespace edm { protected: template void createStreamModules(F iFunc) { - m_streamModules[0] = iFunc(); + for(auto& m: m_streamModules) { + m = iFunc(); + } } void commit(Run& iRun) { @@ -97,8 +100,8 @@ namespace edm { const ProducingModuleAdaptorBase& operator=(const ProducingModuleAdaptorBase&) = delete; // stop default - //Inheriting classes must implement this function for the Worker class - //bool doEvent(EventPrincipal& ep, EventSetup const& c) ; + void doPreallocate(PreallocationConfiguration const&); + virtual void setupStreamModules() = 0; void doBeginJob(); virtual void doEndJob() = 0; diff --git a/FWCore/Framework/src/ModuleHolder.h b/FWCore/Framework/src/ModuleHolder.h index c00916207c3b9..e9cd3ae5ac38a 100644 --- a/FWCore/Framework/src/ModuleHolder.h +++ b/FWCore/Framework/src/ModuleHolder.h @@ -31,6 +31,7 @@ namespace edm { class ModuleDescription; class ProductRegistry; class ExceptionToActionTable; + class PreallocationConfiguration; namespace maker { class ModuleHolder { @@ -42,6 +43,7 @@ namespace edm { virtual ModuleDescription const& moduleDescription() const = 0; virtual void setModuleDescription(ModuleDescription const& iDesc) = 0; + virtual void preallocate(PreallocationConfiguration const& ) = 0; virtual void registerProductsAndCallbacks(ProductRegistry*)=0; virtual void replaceModuleFor(Worker*) const = 0; @@ -68,6 +70,10 @@ namespace edm { void setModuleDescription(ModuleDescription const& iDesc) override { module()->setModuleDescription(iDesc); } + void preallocate(PreallocationConfiguration const& iPrealloc) override { + module()->doPreallocate(iPrealloc); + } + void registerProductsAndCallbacks(ProductRegistry* iReg) override { module()->registerProductsAndCallbacks(module(),iReg); } diff --git a/FWCore/Framework/src/ModuleRegistry.cc b/FWCore/Framework/src/ModuleRegistry.cc index f5ab8547132fc..11bc6bc581726 100644 --- a/FWCore/Framework/src/ModuleRegistry.cc +++ b/FWCore/Framework/src/ModuleRegistry.cc @@ -37,7 +37,8 @@ namespace edm { maker::ModuleHolder* ModuleRegistry::replaceModule(std::string const& iModuleLabel, - edm::ParameterSet const& iPSet) { + edm::ParameterSet const& iPSet, + edm::PreallocationConfiguration const& iPrealloc) { auto modItr = labelToModule_.find(iModuleLabel); if (modItr == labelToModule_.end()) { return nullptr; @@ -46,7 +47,8 @@ namespace edm { auto modPtr= Factory::get()->makeReplacementModule(iPSet); modPtr->setModuleDescription(modItr->second->moduleDescription()); - + modPtr->preallocate(iPrealloc); + // Transfer ownership of worker to the registry modItr->second = modPtr; return modItr->second.get(); diff --git a/FWCore/Framework/src/ModuleRegistry.h b/FWCore/Framework/src/ModuleRegistry.h index 2b2eb55b0eb48..9376a230641db 100644 --- a/FWCore/Framework/src/ModuleRegistry.h +++ b/FWCore/Framework/src/ModuleRegistry.h @@ -31,6 +31,7 @@ namespace edm { class ParameterSet; class MakeModuleParams; class ModuleDescription; + class PreallocationConfiguration; namespace maker { class ModuleHolder; } @@ -44,7 +45,8 @@ namespace edm { signalslot::Signal& iPost); maker::ModuleHolder* replaceModule(std::string const& iModuleLabel, - edm::ParameterSet const& iPSet); + edm::ParameterSet const& iPSet, + edm::PreallocationConfiguration const&); template void forAllModuleHolders(F iFunc) { diff --git a/FWCore/Framework/src/Schedule.cc b/FWCore/Framework/src/Schedule.cc index 19edff2b7d0b9..b2cc985dd8c1d 100644 --- a/FWCore/Framework/src/Schedule.cc +++ b/FWCore/Framework/src/Schedule.cc @@ -344,6 +344,7 @@ namespace edm { resultsInserter_{tns.getTrigPaths().empty()? std::shared_ptr{} :makeInserter(proc_pset,prealloc,preg,actions,areg,processConfiguration)}, moduleRegistry_(new ModuleRegistry()), all_output_communicators_(), + preallocConfig_(prealloc), wantSummary_(tns.wantSummary()), endpathsAreActive_(true) { @@ -902,7 +903,7 @@ namespace edm { return false; } - auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet); + auto newMod = moduleRegistry_->replaceModule(iLabel,iPSet,preallocConfig_); globalSchedule_->replaceModule(newMod,iLabel); diff --git a/FWCore/Framework/src/WorkerMaker.cc b/FWCore/Framework/src/WorkerMaker.cc index 3e9d8ddfaa1e0..7e5757c9cb3f9 100644 --- a/FWCore/Framework/src/WorkerMaker.cc +++ b/FWCore/Framework/src/WorkerMaker.cc @@ -92,6 +92,7 @@ namespace edm { pre(md); module = makeModule(*(p.pset_)); module->setModuleDescription(md); + module->preallocate(*(p.preallocate_)); module->registerProductsAndCallbacks(p.reg_); post(md); } diff --git a/FWCore/Framework/src/global/EDAnalyzerBase.cc b/FWCore/Framework/src/global/EDAnalyzerBase.cc index 57231e747eff7..e59e5d70458a1 100644 --- a/FWCore/Framework/src/global/EDAnalyzerBase.cc +++ b/FWCore/Framework/src/global/EDAnalyzerBase.cc @@ -18,6 +18,8 @@ #include "FWCore/Framework/interface/LuminosityBlock.h" #include "FWCore/Framework/interface/Run.h" #include "FWCore/Framework/src/edmodule_mightGet_config.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" + #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" @@ -56,6 +58,11 @@ namespace edm { return true; } + void + EDAnalyzerBase::doPreallocate(PreallocationConfiguration const& iPrealloc) { + preallocStreams(iPrealloc.numberOfStreams()); + } + void EDAnalyzerBase::doBeginJob() { this->beginJob(); @@ -178,6 +185,7 @@ namespace edm { //postForkReacquireResources(iChildIndex, iNumberOfChildren); } + void EDAnalyzerBase::preallocStreams(unsigned int) {} void EDAnalyzerBase::doBeginStream_(StreamID id){} void EDAnalyzerBase::doEndStream_(StreamID id) {} void EDAnalyzerBase::doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c) {} diff --git a/FWCore/Framework/src/global/EDFilterBase.cc b/FWCore/Framework/src/global/EDFilterBase.cc index 626925427a33e..967cb4be103e1 100644 --- a/FWCore/Framework/src/global/EDFilterBase.cc +++ b/FWCore/Framework/src/global/EDFilterBase.cc @@ -18,6 +18,7 @@ #include "FWCore/Framework/interface/LuminosityBlock.h" #include "FWCore/Framework/interface/Run.h" #include "FWCore/Framework/src/edmodule_mightGet_config.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" @@ -55,6 +56,11 @@ namespace edm { return returnValue; } + void + EDFilterBase::doPreallocate(PreallocationConfiguration const& iPrealloc) { + preallocStreams(iPrealloc.numberOfStreams()); + } + void EDFilterBase::doBeginJob() { this->beginJob(); @@ -184,6 +190,7 @@ namespace edm { //postForkReacquireResources(iChildIndex, iNumberOfChildren); } + void EDFilterBase::preallocStreams(unsigned int) {} void EDFilterBase::doBeginStream_(StreamID id){} void EDFilterBase::doEndStream_(StreamID id) {} void EDFilterBase::doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c) {} diff --git a/FWCore/Framework/src/global/EDProducerBase.cc b/FWCore/Framework/src/global/EDProducerBase.cc index 24a1927fa8e2b..1964e2a7cbc56 100644 --- a/FWCore/Framework/src/global/EDProducerBase.cc +++ b/FWCore/Framework/src/global/EDProducerBase.cc @@ -18,6 +18,7 @@ #include "FWCore/Framework/interface/LuminosityBlock.h" #include "FWCore/Framework/interface/Run.h" #include "FWCore/Framework/src/edmodule_mightGet_config.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" @@ -54,6 +55,11 @@ namespace edm { commit_(e,&previousParentage_, &previousParentageId_); return true; } + + void + EDProducerBase::doPreallocate(PreallocationConfiguration const& iPrealloc) { + preallocStreams(iPrealloc.numberOfStreams()); + } void EDProducerBase::doBeginJob() { @@ -184,6 +190,7 @@ namespace edm { //postForkReacquireResources(iChildIndex, iNumberOfChildren); } + void EDProducerBase::preallocStreams(unsigned int) {} void EDProducerBase::doBeginStream_(StreamID id){} void EDProducerBase::doEndStream_(StreamID id) {} void EDProducerBase::doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c) {} diff --git a/FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc b/FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc index 5cdf4a7e61569..4ac0c85fc997e 100644 --- a/FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc +++ b/FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc @@ -22,6 +22,7 @@ #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" #include "FWCore/Framework/interface/RunPrincipal.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" using namespace edm::stream; // @@ -37,7 +38,6 @@ using namespace edm::stream; // EDAnalyzerAdaptorBase::EDAnalyzerAdaptorBase() { - m_streamModules.resize(1); } // EDAnalyzerAdaptorBase::EDAnalyzerAdaptorBase(const EDAnalyzerAdaptorBase& rhs) @@ -67,6 +67,13 @@ EDAnalyzerAdaptorBase::~EDAnalyzerAdaptorBase() // // member functions // +void +EDAnalyzerAdaptorBase::doPreallocate(PreallocationConfiguration const& iPrealloc) { + m_streamModules.resize(iPrealloc.numberOfStreams(), + static_cast(nullptr)); + setupStreamModules(); +} + void EDAnalyzerAdaptorBase::registerProductsAndCallbacks(EDAnalyzerAdaptorBase const*, ProductRegistry* reg) { for(auto mod : m_streamModules) { diff --git a/FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc b/FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc index 98e2f85d76912..bcfe400af988f 100644 --- a/FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc +++ b/FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc @@ -18,7 +18,7 @@ #include "FWCore/Framework/interface/Run.h" #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" #include "FWCore/Framework/interface/RunPrincipal.h" - +#include "FWCore/Framework/src/PreallocationConfiguration.h" // // constants, enums and typedefs @@ -36,7 +36,6 @@ namespace edm { template< typename T> ProducingModuleAdaptorBase::ProducingModuleAdaptorBase() { - m_streamModules.resize(1); } template< typename T> @@ -50,6 +49,15 @@ namespace edm { // // member functions // + + template< typename T> + void + ProducingModuleAdaptorBase::doPreallocate(PreallocationConfiguration const& iPrealloc) { + m_streamModules.resize(iPrealloc.numberOfStreams(), + static_cast(nullptr)); + setupStreamModules(); + } + template< typename T> void ProducingModuleAdaptorBase::registerProductsAndCallbacks(ProducingModuleAdaptorBase const*, ProductRegistry* reg) { diff --git a/FWCore/Framework/test/global_module_t.cppunit.cc b/FWCore/Framework/test/global_module_t.cppunit.cc index 175b4602eb059..f16153ff0d08e 100644 --- a/FWCore/Framework/test/global_module_t.cppunit.cc +++ b/FWCore/Framework/test/global_module_t.cppunit.cc @@ -12,6 +12,8 @@ #include #include "FWCore/Framework/interface/global/EDProducer.h" #include "FWCore/Framework/src/WorkerT.h" +#include "FWCore/Framework/src/ModuleHolder.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/Framework/interface/OccurrenceTraits.h" #include "DataFormats/Provenance/interface/ProductRegistry.h" #include "DataFormats/Provenance/interface/BranchIDListHelper.h" @@ -312,6 +314,26 @@ class testGlobalModule: public CppUnit::TestFixture }; }; +namespace { + struct ShadowStreamID { + constexpr ShadowStreamID():value(0){} + unsigned int value; + }; + + union IDUnion { + IDUnion(): m_shadow() {} + ShadowStreamID m_shadow; + edm::StreamID m_id; + }; +} +static edm::StreamID makeID() { + IDUnion u; + assert(u.m_id.value() == 0); + return u.m_id; +} +static const edm::StreamID s_streamID0 = makeID(); + + ///registration of the test so that the runner can find it CPPUNIT_TEST_SUITE_REGISTRATION(testGlobalModule); @@ -343,54 +365,54 @@ m_ep() //For each transition, bind a lambda which will call the proper method of the Worker m_transToFunc[Trans::kBeginStream] = [this](edm::Worker* iBase) { - edm::StreamContext streamContext(edm::StreamID::invalidStreamID(), nullptr); - iBase->beginStream(edm::StreamID::invalidStreamID(), streamContext); }; + edm::StreamContext streamContext(s_streamID0, nullptr); + iBase->beginStream(s_streamID0, streamContext); }; m_transToFunc[Trans::kGlobalBeginRun] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_rp,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_rp,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kStreamBeginRun] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_rp,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_rp,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kGlobalBeginLuminosityBlock] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_lbp,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_lbp,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kStreamBeginLuminosityBlock] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_lbp,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_lbp,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kEvent] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_ep,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_ep,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kStreamEndLuminosityBlock] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_lbp,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_lbp,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kGlobalEndLuminosityBlock] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_lbp,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_lbp,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kStreamEndRun] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_rp,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_rp,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kGlobalEndRun] = [this](edm::Worker* iBase) { typedef edm::OccurrenceTraits Traits; edm::ParentContext nullParentContext; - iBase->doWork(*m_rp,*m_es,m_timer, edm::StreamID::invalidStreamID(), nullParentContext, nullptr); }; + iBase->doWork(*m_rp,*m_es,m_timer, s_streamID0, nullParentContext, nullptr); }; m_transToFunc[Trans::kEndStream] = [this](edm::Worker* iBase) { - edm::StreamContext streamContext(edm::StreamID::invalidStreamID(), nullptr); - iBase->endStream(edm::StreamID::invalidStreamID(), streamContext); }; + edm::StreamContext streamContext(s_streamID0, nullptr); + iBase->endStream(s_streamID0, streamContext); }; } @@ -432,6 +454,9 @@ void testGlobalModule::basicTest() void testGlobalModule::streamTest() { std::unique_ptr testProd{ new StreamProd }; + edm::maker::ModuleHolderT h(testProd.get(),nullptr); + h.preallocate(edm::PreallocationConfiguration{}); + h.release(); CPPUNIT_ASSERT(0 == testProd->m_count); testTransitions(testProd.get(), {Trans::kBeginStream, Trans::kStreamBeginRun, Trans::kStreamBeginLuminosityBlock, Trans::kEvent, diff --git a/FWCore/Framework/test/stream_module_t.cppunit.cc b/FWCore/Framework/test/stream_module_t.cppunit.cc index a31416dc3cae6..ce9b7a7a3dcce 100644 --- a/FWCore/Framework/test/stream_module_t.cppunit.cc +++ b/FWCore/Framework/test/stream_module_t.cppunit.cc @@ -11,6 +11,8 @@ #include #include "FWCore/Framework/src/Worker.h" #include "FWCore/Framework/src/WorkerT.h" +#include "FWCore/Framework/src/ModuleHolder.h" +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/Framework/interface/stream/EDProducer.h" #include "FWCore/Framework/interface/stream/EDProducerAdaptor.h" #include "FWCore/Framework/interface/OccurrenceTraits.h" @@ -448,7 +450,11 @@ namespace { template std::unique_ptr createModule() { edm::ParameterSet pset; - return std::unique_ptr(new edm::stream::EDProducerAdaptor(pset)); + std::unique_ptr retValue(new edm::stream::EDProducerAdaptor(pset)); + edm::maker::ModuleHolderT h(retValue.get(),nullptr); + h.preallocate(edm::PreallocationConfiguration{}); + h.release(); + return retValue; } template void From 968475e7ce745d9e4f11a63d4897837e7d04ba7e Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 30 Aug 2013 18:00:47 -0500 Subject: [PATCH 09/10] Allow more than one stream in the job If more than one stream is requested in the configuration, create them and then distribute the events using a round-robin algorithm. --- FWCore/Framework/src/EventProcessor.cc | 28 +++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 8f4328699cbdb..0cd3221068609 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -594,24 +594,33 @@ namespace edm { unsigned int nThreads=1; if(optionsPset.existsAs("numberOfThreads",false)) { nThreads = optionsPset.getUntrackedParameter("numberOfThreads"); + if(nThreads == 0) { + nThreads = 1; + } } + /* TODO: when we support having each stream run in a different thread use this default + unsigned int nStreams =nThreads; + */ unsigned int nStreams =1; - /* if(optionsPset.existsAs("numberOfStreams",false)) { - nThreads = optionsPset.getUntrackedParameter("numberOfStreams"); + nStreams = optionsPset.getUntrackedParameter("numberOfStreams"); + if(nStreams==0) { + nStreams = nThreads; + } } + /* bool nRunsSet = false; */ unsigned int nConcurrentRuns =1; /* if(nRunsSet = optionsPset.existsAs("numberOfConcurrentRuns",false)) { - nThreads = optionsPset.getUntrackedParameter("numberOfConcurrentRuns"); + nConcurrentRuns = optionsPset.getUntrackedParameter("numberOfConcurrentRuns"); } */ unsigned int nConcurrentLumis =1; /* if(optionsPset.existsAs("numberOfConcurrentLuminosityBlocks",false)) { - nThreads = optionsPset.getUntrackedParameter("numberOfConcurrentLuminosityBlocks"); + nConcurrentLumis = optionsPset.getUntrackedParameter("numberOfConcurrentLuminosityBlocks"); } else { nConcurrentLumis = nConcurrentRuns; } @@ -673,7 +682,7 @@ namespace edm { nConcurrentRuns=1; } - preallocations_ = PreallocationConfiguration(nThreads,nStreams,nConcurrentLumis,nConcurrentRuns); + preallocations_ = PreallocationConfiguration{nThreads,nStreams,nConcurrentLumis,nConcurrentRuns}; // initialize the input source input_ = makeInput(*parameterSet, *common, *items.preg_, items.branchIDListHelper_, items.actReg_, items.processConfiguration_); @@ -2239,9 +2248,14 @@ namespace edm { //While all the following item types are isEvent, process them right here asyncStopRequestedWhileProcessingEvents_ = false; + + //We will round-robin which stream to use + unsigned int nextStreamIndex=0; + const unsigned int kNumStreams = preallocations_.numberOfStreams(); do { - readEvent(0); - processEvent(0); + readEvent(nextStreamIndex); + processEvent(nextStreamIndex); + nextStreamIndex = (nextStreamIndex+1) % kNumStreams; if(shouldWeStop()) { break; From 6a73fa0d1b3fae19e19c1049752124a503d42a8a Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Sat, 31 Aug 2013 11:55:06 -0500 Subject: [PATCH 10/10] Updated SecondaryEventProvider to use the new constructor for WorkerManager --- Mixing/Base/src/SecondaryEventProvider.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Mixing/Base/src/SecondaryEventProvider.cc b/Mixing/Base/src/SecondaryEventProvider.cc index ee469c4f03d88..6a727f08ab44c 100644 --- a/Mixing/Base/src/SecondaryEventProvider.cc +++ b/Mixing/Base/src/SecondaryEventProvider.cc @@ -1,5 +1,5 @@ #include "Mixing/Base/src/SecondaryEventProvider.h" - +#include "FWCore/Framework/src/PreallocationConfiguration.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/Utilities/interface/StreamID.h" @@ -11,10 +11,12 @@ namespace edm { workerManager_(boost::shared_ptr(new ActivityRegistry), actions) { std::vector shouldBeUsedLabels; std::set unscheduledLabels; + const PreallocationConfiguration preallocConfig; for(auto& pset : psets) { std::string label = pset.getParameter("@module_label"); workerManager_.addToUnscheduledWorkers(pset, preg, + &preallocConfig, processConfiguration, label, false,