Skip to content

Commit

Permalink
Merge pull request #685 from Dr15Jones/multipleStreamsInEventProcessor
Browse files Browse the repository at this point in the history
Multiple streams in cmsRun
  • Loading branch information
nclopezo committed Sep 2, 2013
2 parents 2fdd3d8 + 919dc03 commit 4b528be
Show file tree
Hide file tree
Showing 50 changed files with 463 additions and 185 deletions.
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/EDAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace edm {

class ModuleCallingContext;
class PreallocationConfiguration;

namespace maker {
template<typename T> class ModuleHolderT;
Expand Down Expand Up @@ -41,6 +42,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal const& ep, EventSetup const& c,
ModuleCallingContext const* mcc);
void doPreallocate(PreallocationConfiguration const&) {}
void doBeginJob();
void doEndJob();
bool doBeginRun(RunPrincipal const& rp, EventSetup const& c,
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/EDFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace edm {
}

class ModuleCallingContext;
class PreallocationConfiguration;

class EDFilter : public ProducerBase, public EDConsumerBase {
public:
Expand All @@ -49,6 +50,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal& ep, EventSetup const& c,
ModuleCallingContext const* mcc);
void doPreallocate(PreallocationConfiguration const&) {}
void doBeginJob();
void doEndJob();
void doBeginRun(RunPrincipal& rp, EventSetup const& c,
Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/EDProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ EDProducts into an Event.
namespace edm {

class ModuleCallingContext;
class PreallocationConfiguration;

namespace maker {
template<typename T> class ModuleHolderT;
}
Expand All @@ -44,6 +46,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal& ep, EventSetup const& c,
ModuleCallingContext const* mcc);
void doPreallocate(PreallocationConfiguration const&) {}
void doBeginJob();
void doEndJob();
void doBeginRun(RunPrincipal& rp, EventSetup const& c,
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/EventPrincipal.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace edm {
boost::shared_ptr<BranchIDListHelper const> branchIDListHelper,
ProcessConfiguration const& pc,
HistoryAppender* historyAppender,
StreamID const& streamID = StreamID::invalidStreamID());
unsigned int streamIndex = 0);
~EventPrincipal() {}

void fillEventPrincipal(EventAuxiliary const& aux,
Expand Down
14 changes: 14 additions & 0 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ configured in the user's main() function, and is set running.

#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/IEventProcessor.h"
#include "FWCore/Framework/interface/InputSource.h"
#include "FWCore/Framework/src/PrincipalCache.h"
#include "FWCore/Framework/src/SignallingProductRegistry.h"
#include "FWCore/Framework/src/PreallocationConfiguration.h"
Expand Down Expand Up @@ -303,6 +304,15 @@ namespace edm {
}

void possiblyContinueAfterForkChildFailure();

//read the next event using Stream iStreamIndex
void readEvent(unsigned int iStreamIndex);

//process the already read event using Stream iStreamIndex
void processEvent(unsigned int iStreamIndex);

//returns true if an asynchronous stop was requested
bool checkForAsyncStopRequest(StatusCode&);
//------------------------------------------------------------------
//
// Data members below.
Expand Down Expand Up @@ -360,6 +370,10 @@ namespace edm {

PreallocationConfiguration preallocations_;

bool asyncStopRequestedWhileProcessingEvents_;
InputSource::ItemType nextItemTypeFromProcessingEvents_;
StatusCode asyncStopStatusCodeFromProcessingEvents_;

typedef std::set<std::pair<std::string, std::string> > ExcludedData;
typedef std::map<std::string, ExcludedData> ExcludedDataMap;
ExcludedDataMap eventSetupDataToExcludeFromPrefetching_;
Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/OutputModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ output stream.
namespace edm {

class ModuleCallingContext;
class PreallocationConfiguration;

namespace maker {
template<typename T> class ModuleHolderT;
Expand Down Expand Up @@ -91,6 +92,8 @@ namespace edm {

ParameterSetID selectorConfig() const { return selector_config_id_; }

void doPreallocate(PreallocationConfiguration const&) {}

void doBeginJob();
void doEndJob();
bool doEvent(EventPrincipal const& ep, EventSetup const& c,
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/Schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include "FWCore/Framework/src/WorkerRegistry.h"
#include "FWCore/Framework/src/GlobalSchedule.h"
#include "FWCore/Framework/src/StreamSchedule.h"
#include "FWCore/Framework/src/PreallocationConfiguration.h"
#include "FWCore/MessageLogger/interface/ExceptionMessages.h"
#include "FWCore/MessageLogger/interface/JobReport.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
Expand Down Expand Up @@ -243,6 +244,7 @@ namespace edm {
std::unique_ptr<GlobalSchedule> globalSchedule_;

AllOutputModuleCommunicators all_output_communicators_;
PreallocationConfiguration preallocConfig_;


bool wantSummary_;
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/interface/SubProcess.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace edm {
void doBeginJob();
void doEndJob();

void doEvent(EventPrincipal const& principal, IOVSyncValue const& ts);
void doEvent(EventPrincipal const& principal);

void doBeginRun(RunPrincipal const& principal, IOVSyncValue const& ts);

Expand Down Expand Up @@ -204,7 +204,7 @@ namespace edm {
private:
void beginJob();
void endJob();
void process(EventPrincipal const& e, IOVSyncValue const& ts);
void process(EventPrincipal const& e);
void beginRun(RunPrincipal const& r, IOVSyncValue const& ts);
void endRun(RunPrincipal const& r, IOVSyncValue const& ts, bool cleaningUpAfterException);
void beginLuminosityBlock(LuminosityBlockPrincipal const& lb, IOVSyncValue const& ts);
Expand Down
15 changes: 9 additions & 6 deletions FWCore/Framework/interface/WorkerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace edm {
class StreamID;
class StreamContext;
class ModuleRegistry;
class PreallocationConfiguration;

class WorkerManager {
public:
Expand All @@ -35,12 +36,13 @@ namespace edm {
boost::shared_ptr<ActivityRegistry> actReg,
ExceptionToActionTable const& actions);
void addToUnscheduledWorkers(ParameterSet& pset,
ProductRegistry& preg,
boost::shared_ptr<ProcessConfiguration> processConfiguration,
std::string label,
bool useStopwatch,
std::set<std::string>& unscheduledLabels,
std::vector<std::string>& shouldBeUsedLabels);
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
boost::shared_ptr<ProcessConfiguration> processConfiguration,
std::string label,
bool useStopwatch,
std::set<std::string>& unscheduledLabels,
std::vector<std::string>& shouldBeUsedLabels);

void setOnDemandProducts(ProductRegistry& pregistry, std::set<std::string> const& unscheduledLabels) const;

Expand All @@ -67,6 +69,7 @@ namespace edm {

Worker* getWorker(ParameterSet& pset,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
boost::shared_ptr<ProcessConfiguration const> processConfiguration,
std::string const& label);

Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/global/EDAnalyzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

namespace edm {
class ModuleCallingContext;
class PreallocationConfiguration;
class StreamID;

namespace maker {
Expand Down Expand Up @@ -59,6 +60,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal& ep, EventSetup const& c,
ModuleCallingContext const*);
void doPreallocate(PreallocationConfiguration const&);
void doBeginJob();
void doEndJob();

Expand Down Expand Up @@ -105,6 +107,7 @@ namespace edm {
virtual void beginJob() {}
virtual void endJob(){}

virtual void preallocStreams(unsigned int);
virtual void doBeginStream_(StreamID id);
virtual void doEndStream_(StreamID id);
virtual void doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c);
Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/global/EDFilterBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

namespace edm {
class ModuleCallingContext;
class PreallocationConfiguration;
class StreamID;

namespace maker {
Expand Down Expand Up @@ -60,6 +61,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal& ep, EventSetup const& c,
ModuleCallingContext const*);
void doPreallocate(PreallocationConfiguration const&);
void doBeginJob();
void doEndJob();

Expand Down Expand Up @@ -108,6 +110,7 @@ namespace edm {
virtual void beginJob() {}
virtual void endJob(){}

virtual void preallocStreams(unsigned int);
virtual void doBeginStream_(StreamID id);
virtual void doEndStream_(StreamID id);
virtual void doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c);
Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/global/EDProducerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

namespace edm {
class ModuleCallingContext;
class PreallocationConfiguration;
class StreamID;

namespace maker {
Expand Down Expand Up @@ -60,6 +61,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal& ep, EventSetup const& c,
ModuleCallingContext const*);
void doPreallocate(PreallocationConfiguration const&);
void doBeginJob();
void doEndJob();

Expand Down Expand Up @@ -108,6 +110,7 @@ namespace edm {
virtual void beginJob() {}
virtual void endJob(){}

virtual void preallocStreams(unsigned int);
virtual void doBeginStream_(StreamID id);
virtual void doEndStream_(StreamID id);
virtual void doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c);
Expand Down
17 changes: 13 additions & 4 deletions FWCore/Framework/interface/global/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,24 @@ namespace edm {
StreamCacheHolder() = default;
StreamCacheHolder( StreamCacheHolder<T,C> const&) = delete;
StreamCacheHolder<T,C>& operator=(StreamCacheHolder<T,C> const&) = delete;
~StreamCacheHolder() {
for(auto c: caches_){
delete c;
}
}
protected:
T * streamCache(edm::StreamID iID) const { return cache_.get(); }
T * streamCache(edm::StreamID iID) const { return caches_[iID.value()]; }
private:
virtual void preallocStreams(unsigned int iNStreams) override final {
caches_.resize(iNStreams,static_cast<C*>(nullptr));
}
virtual void doBeginStream_(StreamID id) override final {
cache_ = beginStream(id);
caches_[id.value()] = beginStream(id).release();
}
virtual void doEndStream_(StreamID id) override final {
endStream(id);
cache_.reset();
delete caches_[id.value()];
caches_[id.value()]=nullptr;
}
virtual void doStreamBeginRun_(StreamID id, Run const& rp, EventSetup const& c) override final {
streamBeginRun(id,rp,c);
Expand All @@ -72,7 +81,7 @@ namespace edm {
virtual void endStream(edm::StreamID) const {}

//When threaded we will have a container for N items whre N is # of streams
std::unique_ptr<C> cache_;
std::vector<C*> caches_;
};

template <typename T, typename C>
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/one/EDAnalyzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
namespace edm {

class ModuleCallingContext;
class PreallocationConfiguration;

namespace maker {
template<typename T> class ModuleHolderT;
Expand Down Expand Up @@ -64,6 +65,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal& ep, EventSetup const& c,
ModuleCallingContext const*);
void doPreallocate(PreallocationConfiguration const&) {}
void doBeginJob();
void doEndJob();

Expand Down
4 changes: 3 additions & 1 deletion FWCore/Framework/interface/one/EDFilterBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
namespace edm {

class ModuleCallingContext;

class PreallocationConfiguration;

namespace maker {
template<typename T> class ModuleHolderT;
}
Expand Down Expand Up @@ -60,6 +61,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal& ep, EventSetup const& c,
ModuleCallingContext const*);
void doPreallocate(PreallocationConfiguration const&) {}
void doBeginJob();
void doEndJob();

Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/interface/one/EDProducerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
namespace edm {

class ModuleCallingContext;
class PreallocationConfiguration;
namespace maker {
template<typename T> class ModuleHolderT;
}
Expand Down Expand Up @@ -59,6 +60,7 @@ namespace edm {
private:
bool doEvent(EventPrincipal& ep, EventSetup const& c,
ModuleCallingContext const*);
void doPreallocate(PreallocationConfiguration const&) {}
void doBeginJob();
void doEndJob();

Expand Down
5 changes: 4 additions & 1 deletion FWCore/Framework/interface/one/OutputModuleBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
namespace edm {

class ModuleCallingContext;
class PreallocationConfiguration;
template <typename T> class OutputModuleCommunicatorT;

namespace maker {
Expand Down Expand Up @@ -104,7 +105,9 @@ namespace edm {
ModuleDescription const& description() const;

ParameterSetID selectorConfig() const { return selector_config_id_; }


void doPreallocate(PreallocationConfiguration const&) {}

void doBeginJob();
void doEndJob();
bool doEvent(EventPrincipal const& ep, EventSetup const& c,
Expand Down
10 changes: 8 additions & 2 deletions FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ namespace edm {
{

public:
EDAnalyzerAdaptor( edm::ParameterSet const& iPSet)
EDAnalyzerAdaptor( edm::ParameterSet const& iPSet):
m_pset(&iPSet)
{
m_runs.resize(1);
m_lumis.resize(1);
m_runSummaries.resize(1);
m_lumiSummaries.resize(1);
typename T::GlobalCache const* dummy=nullptr;
m_global.reset( impl::makeGlobal<T>(iPSet,dummy).release());
this->createStreamModules([this,&iPSet] () -> EDAnalyzerBase* {return impl::makeStreamModule<T>(iPSet,m_global.get());});
}
~EDAnalyzerAdaptor() {
}
Expand All @@ -98,6 +98,11 @@ namespace edm {
typedef CallGlobalLuminosityBlock<T> MyGlobalLuminosityBlock;
typedef CallGlobalLuminosityBlockSummary<T> MyGlobalLuminosityBlockSummary;

void setupStreamModules() override final {
this->createStreamModules([this] () -> EDAnalyzerBase* {return impl::makeStreamModule<T>(*m_pset,m_global.get());});
m_pset= nullptr;
}

void doEndJob() override final {
MyGlobal::endJob(m_global.get());
}
Expand Down Expand Up @@ -195,6 +200,7 @@ namespace edm {
typename impl::choose_shared_vec<typename T::LuminosityBlockCache const>::type m_lumis;
typename impl::choose_shared_vec<typename T::RunSummaryCache>::type m_runSummaries;
typename impl::choose_shared_vec<typename T::LuminosityBlockSummaryCache>::type m_lumiSummaries;
ParameterSet const* m_pset;
};
}

Expand Down
Loading

0 comments on commit 4b528be

Please sign in to comment.