diff --git a/FWCore/Framework/interface/EDConsumerBase.h b/FWCore/Framework/interface/EDConsumerBase.h index 96d3a1d13a0a4..a584ab79c1ec0 100644 --- a/FWCore/Framework/interface/EDConsumerBase.h +++ b/FWCore/Framework/interface/EDConsumerBase.h @@ -88,6 +88,7 @@ namespace edm { void itemsToGet(BranchType, std::vector&) const; void itemsMayGet(BranchType, std::vector&) const; + //used for prefetching std::vector const& itemsToGetFrom(BranchType iType) const { return itemsToGetFromBranch_[iType]; } @@ -237,6 +238,9 @@ namespace edm { iRecord.type()); } + //used for FinalPath + void resetItemsToGetFrom(BranchType iType) { itemsToGetFromBranch_[iType].clear(); } + private: virtual void registerLateConsumes(eventsetup::ESRecordsToProxyIndices const&) {} unsigned int recordConsumes(BranchType iBranch, TypeToGet const& iType, edm::InputTag const& iTag, bool iAlwaysGets); diff --git a/FWCore/Framework/interface/EventProcessor.h b/FWCore/Framework/interface/EventProcessor.h index 3ba6b50dd5131..7ec443f5c7be7 100644 --- a/FWCore/Framework/interface/EventProcessor.h +++ b/FWCore/Framework/interface/EventProcessor.h @@ -152,14 +152,6 @@ namespace edm { /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents() int totalEventsFailed() const; - /// Turn end_paths "off" if "active" is false; - /// turn end_paths "on" if "active" is true. - void enableEndPaths(bool active); - - /// Return true if end_paths are active, and false if they are - /// inactive. - bool endPathsEnabled() const; - /// Clears counters used by trigger report. void clearCounters(); diff --git a/FWCore/Framework/interface/OutputModuleCore.h b/FWCore/Framework/interface/OutputModuleCore.h new file mode 100644 index 0000000000000..dcf201bf91fec --- /dev/null +++ b/FWCore/Framework/interface/OutputModuleCore.h @@ -0,0 +1,254 @@ +#ifndef FWCore_Framework_one_OutputModuleCore_h +#define FWCore_Framework_one_OutputModuleCore_h +// -*- C++ -*- +// +// Package: FWCore/Framework +// Class : OutputModuleCore +// +/**\class OutputModuleCore OutputModuleCore.h "FWCore/Framework/interface/one/OutputModuleCore.h" + + Description: Base class for all 'one' OutputModules + + Usage: + + +*/ +// +// Original Author: Chris Jones +// Created: Wed, 31 Jul 2013 15:37:16 GMT +// + +// system include files +#include +#include +#include +#include +#include +#include +#include + +// user include files +#include "DataFormats/Provenance/interface/BranchID.h" +#include "DataFormats/Provenance/interface/BranchIDList.h" +#include "DataFormats/Provenance/interface/ModuleDescription.h" +#include "DataFormats/Provenance/interface/SelectedProducts.h" + +#include "FWCore/Common/interface/FWCoreCommonFwd.h" +#include "FWCore/Common/interface/OutputProcessBlockHelper.h" +#include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h" +#include "FWCore/Framework/interface/Frameworkfwd.h" +#include "FWCore/Framework/interface/ProductSelectorRules.h" +#include "FWCore/Framework/interface/ProductSelector.h" +#include "FWCore/Framework/interface/EDConsumerBase.h" +#include "FWCore/Framework/interface/getAllTriggerNames.h" +#include "FWCore/ParameterSet/interface/ParameterSetfwd.h" +#include "FWCore/Concurrency/interface/WaitingTaskHolder.h" +#include "FWCore/Utilities/interface/propagate_const.h" + +// forward declarations +namespace edm { + + class MergeableRunProductMetadata; + class ModuleCallingContext; + class PreallocationConfiguration; + class ActivityRegistry; + class ThinnedAssociationsHelper; + + template + class OutputModuleCommunicatorT; + + namespace maker { + template + class ModuleHolderT; + } + + namespace core { + + class OutputModuleCore : public EDConsumerBase { + public: + template + friend class edm::maker::ModuleHolderT; + template + friend class ::edm::WorkerT; + template + friend class ::edm::OutputModuleCommunicatorT; + typedef OutputModuleCore ModuleType; + + explicit OutputModuleCore(ParameterSet const& pset); + ~OutputModuleCore() override; + + OutputModuleCore(OutputModuleCore const&) = delete; // Disallow copying and moving + OutputModuleCore& operator=(OutputModuleCore const&) = delete; // Disallow copying and moving + + /// Accessor for maximum number of events to be written. + /// -1 is used for unlimited. + int maxEvents() const { return maxEvents_; } + + /// Accessor for remaining number of events to be written. + /// -1 is used for unlimited. + int remainingEvents() const { return remainingEvents_; } + + bool selected(BranchDescription const& desc) const; + + void selectProducts(ProductRegistry const& preg, ThinnedAssociationsHelper const&, ProcessBlockHelperBase const&); + std::string const& processName() const { return process_name_; } + SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; } + std::array const& hasNewlyDroppedBranch() const { return hasNewlyDroppedBranch_; } + + static void fillDescription( + ParameterSetDescription& desc, + std::vector const& iDefaultOutputCommands = ProductSelectorRules::defaultSelectionStrings()); + static void fillDescriptions(ConfigurationDescriptions& descriptions); + static const std::string& baseType(); + static void prevalidate(ConfigurationDescriptions&); + + bool wantAllEvents() const { return wantAllEvents_; } + + BranchIDLists const* branchIDLists() const; + + OutputProcessBlockHelper const& outputProcessBlockHelper() const { return outputProcessBlockHelper_; } + + ThinnedAssociationsHelper const* thinnedAssociationsHelper() const; + + const ModuleDescription& moduleDescription() const { return moduleDescription_; } + + protected: + ModuleDescription const& description() const; + + ParameterSetID selectorConfig() const { return selector_config_id_; } + + void doPreallocate_(PreallocationConfiguration const&); + virtual void preallocLumis(unsigned int); + + void doBeginJob_(); + void doEndJob(); + bool doEvent_(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*); + void doBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} + void doAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} + void doEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} + bool doBeginRun(RunTransitionInfo const&, ModuleCallingContext const*); + bool doEndRun(RunTransitionInfo const&, ModuleCallingContext const*); + bool doBeginLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*); + bool doEndLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*); + + void setEventSelectionInfo( + std::map>> const& outputModulePathPositions, + bool anyProductProduced); + + void configure(OutputModuleDescription const& desc); + + std::map const& droppedBranchIDToKeptBranchID() { + return droppedBranchIDToKeptBranchID_; + } + + //inheriting classes decrement this in doEvent in a manner that will be thread-safe for that class + std::atomic remainingEvents_; + + private: + int maxEvents_; + + // TODO: Give OutputModule + // an interface (protected?) that supplies client code with the + // needed functionality *without* giving away implementation + // details ... don't just return a reference to keptProducts_, because + // we are looking to have the flexibility to change the + // implementation of keptProducts_ without modifying clients. When this + // change is made, we'll have a one-time-only task of modifying + // clients (classes derived from OutputModule) to use the + // newly-introduced interface. + // TODO: Consider using shared pointers here? + + // keptProducts_ are pointers to the BranchDescription objects describing + // the branches we are to write. + // + // We do not own the BranchDescriptions to which we point. + SelectedProductsForBranchType keptProducts_; + std::array hasNewlyDroppedBranch_; + + std::string process_name_; + ProductSelectorRules productSelectorRules_; + ProductSelector productSelector_; + ModuleDescription moduleDescription_; + + bool wantAllEvents_; + std::vector selectors_; + ParameterSet selectEvents_; + std::vector tokensForEndPaths_; //needed for FinalPath + bool onFinalPath_ = false; + // ID of the ParameterSet that configured the event selector + // subsystem. + ParameterSetID selector_config_id_; + + // needed because of possible EDAliases. + // filled in only if key and value are different. + std::map droppedBranchIDToKeptBranchID_; + edm::propagate_const> branchIDLists_; + BranchIDLists const* origBranchIDLists_; + + edm::propagate_const> thinnedAssociationsHelper_; + std::map keepAssociation_; + + OutputProcessBlockHelper outputProcessBlockHelper_; + + //------------------------------------------------------------------ + // private member functions + //------------------------------------------------------------------ + void updateBranchIDListsWithKeptAliases(); + + void doWriteProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*); + void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*, MergeableRunProductMetadata const*); + void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*); + void doOpenFile(FileBlock const& fb); + void doRespondToOpenInputFile(FileBlock const& fb); + void doRespondToCloseInputFile(FileBlock const& fb); + void doRespondToCloseOutputFile() {} + void doRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) {} + + /// Tell the OutputModule that is must end the current file. + void doCloseFile(); + + void registerProductsAndCallbacks(OutputModuleCore const*, ProductRegistry const*) {} + + bool needToRunSelection() const; + std::vector productsUsedBySelection() const; + bool prePrefetchSelection(StreamID id, EventPrincipal const&, ModuleCallingContext const*); + + // Do the end-of-file tasks; this is only called internally, after + // the appropriate tests have been done. + virtual void reallyCloseFile(); + + /// Ask the OutputModule if we should end the current file. + virtual bool shouldWeCloseFile() const { return false; } + + virtual void write(EventForOutput const&) = 0; + + virtual void beginJob() {} + virtual void endJob() {} + virtual void writeLuminosityBlock(LuminosityBlockForOutput const&) = 0; + virtual void writeRun(RunForOutput const&) = 0; + virtual void writeProcessBlock(ProcessBlockForOutput const&) {} + virtual void openFile(FileBlock const&) {} + virtual bool isFileOpen() const { return true; } + + virtual void doBeginRun_(RunForOutput const&) {} + virtual void doEndRun_(RunForOutput const&) {} + virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const&) {} + virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const&) {} + virtual void doRespondToOpenInputFile_(FileBlock const&) {} + virtual void doRespondToCloseInputFile_(FileBlock const&) {} + + virtual void setProcessesWithSelectedMergeableRunProducts(std::set const&) {} + + bool hasAccumulator() const { return false; } + + void keepThisBranch(BranchDescription const& desc, + std::map& trueBranchIDToKeptBranchDesc, + std::set& keptProductsInEvent); + + void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; } + + bool limitReached() const { return remainingEvents_ == 0; } + }; + } // namespace core +} // namespace edm +#endif diff --git a/FWCore/Framework/interface/Schedule.h b/FWCore/Framework/interface/Schedule.h index 4bf25e4c2f935..c2bb8094e6cee 100644 --- a/FWCore/Framework/interface/Schedule.h +++ b/FWCore/Framework/interface/Schedule.h @@ -253,14 +253,6 @@ namespace edm { /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents() int totalEventsFailed() const; - /// Turn end_paths "off" if "active" is false; - /// turn end_paths "on" if "active" is true. - void enableEndPaths(bool active); - - /// Return true if end_paths are active, and false if they are - /// inactive. - bool endPathsEnabled() const; - /// Return the trigger report information on paths, /// modules-in-path, modules-in-endpath, and modules. void getTriggerReport(TriggerReport& rep) const; @@ -319,8 +311,6 @@ namespace edm { std::vector const* pathNames_; std::vector const* endPathNames_; bool wantSummary_; - - volatile bool endpathsAreActive_; }; template diff --git a/FWCore/Framework/interface/StreamSchedule.h b/FWCore/Framework/interface/StreamSchedule.h index 563b03a03f767..eed016f5862f3 100644 --- a/FWCore/Framework/interface/StreamSchedule.h +++ b/FWCore/Framework/interface/StreamSchedule.h @@ -233,14 +233,6 @@ namespace edm { /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents() int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); } - /// Turn end_paths "off" if "active" is false; - /// turn end_paths "on" if "active" is true. - void enableEndPaths(bool active); - - /// Return true if end_paths are active, and false if they are - /// inactive. - bool endPathsEnabled() const; - /// Return the trigger report information on paths, /// modules-in-path, modules-in-endpath, and modules. void getTriggerReport(TriggerReport& rep) const; @@ -368,7 +360,6 @@ namespace edm { StreamID streamID_; StreamContext streamContext_; - volatile bool endpathsAreActive_; std::atomic skippingEvent_; }; diff --git a/FWCore/Framework/interface/SubProcess.h b/FWCore/Framework/interface/SubProcess.h index 5ca673eb388f5..46a6829065d2f 100644 --- a/FWCore/Framework/interface/SubProcess.h +++ b/FWCore/Framework/interface/SubProcess.h @@ -208,20 +208,6 @@ namespace edm { return schedule_->totalEventsFailed(); } - /// Turn end_paths "off" if "active" is false; - /// Turn end_paths "on" if "active" is true. - void enableEndPaths(bool active) { - ServiceRegistry::Operate operate(serviceToken_); - schedule_->enableEndPaths(active); - for_all(subProcesses_, [active](auto& subProcess) { subProcess.enableEndPaths(active); }); - } - - /// Return true if end_paths are active, and false if they are inactive. - bool endPathsEnabled() const { - ServiceRegistry::Operate operate(serviceToken_); - return schedule_->endPathsEnabled(); - } - /// Return the trigger report information on paths, /// modules-in-path, modules-in-endpath, and modules. void getTriggerReport(TriggerReport& rep) const { diff --git a/FWCore/Framework/interface/global/OutputModuleBase.h b/FWCore/Framework/interface/global/OutputModuleBase.h index bee9eda7130d9..0b7dbf36c1a51 100644 --- a/FWCore/Framework/interface/global/OutputModuleBase.h +++ b/FWCore/Framework/interface/global/OutputModuleBase.h @@ -17,53 +17,16 @@ // // system include files -#include -#include -#include -#include -#include -#include -#include -#include // user include files -#include "DataFormats/Provenance/interface/BranchID.h" -#include "DataFormats/Provenance/interface/BranchIDList.h" -#include "DataFormats/Provenance/interface/ModuleDescription.h" -#include "DataFormats/Provenance/interface/SelectedProducts.h" - -#include "FWCore/Common/interface/FWCoreCommonFwd.h" -#include "FWCore/Common/interface/OutputProcessBlockHelper.h" -#include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h" -#include "FWCore/Framework/interface/Frameworkfwd.h" -#include "FWCore/Framework/interface/ProductSelectorRules.h" -#include "FWCore/Framework/interface/ProductSelector.h" -#include "FWCore/Framework/interface/EDConsumerBase.h" -#include "FWCore/Framework/interface/getAllTriggerNames.h" -#include "FWCore/ParameterSet/interface/ParameterSetfwd.h" -#include "FWCore/Concurrency/interface/WaitingTaskHolder.h" -#include "FWCore/Utilities/interface/propagate_const.h" +#include "FWCore/Framework/interface/OutputModuleCore.h" // forward declarations namespace edm { - class MergeableRunProductMetadata; - class ModuleCallingContext; - class PreallocationConfiguration; - class ActivityRegistry; - class ThinnedAssociationsHelper; - - template - class OutputModuleCommunicatorT; - - namespace maker { - template - class ModuleHolderT; - } - namespace global { - class OutputModuleBase : public EDConsumerBase { + class OutputModuleBase : public core::OutputModuleCore { public: template friend class edm::maker::ModuleHolderT; @@ -74,41 +37,10 @@ namespace edm { typedef OutputModuleBase ModuleType; explicit OutputModuleBase(ParameterSet const& pset); - ~OutputModuleBase() override; OutputModuleBase(OutputModuleBase const&) = delete; // Disallow copying and moving OutputModuleBase& operator=(OutputModuleBase const&) = delete; // Disallow copying and moving - /// Accessor for maximum number of events to be written. - /// -1 is used for unlimited. - int maxEvents() const { return maxEvents_; } - - /// Accessor for remaining number of events to be written. - /// -1 is used for unlimited. - int remainingEvents() const { return remainingEvents_; } - - bool selected(BranchDescription const& desc) const; - - void selectProducts(ProductRegistry const& preg, ThinnedAssociationsHelper const&, ProcessBlockHelperBase const&); - std::string const& processName() const { return process_name_; } - SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; } - std::array const& hasNewlyDroppedBranch() const { return hasNewlyDroppedBranch_; } - - static void fillDescription(ParameterSetDescription& desc); - static void fillDescriptions(ConfigurationDescriptions& descriptions); - static const std::string& baseType(); - static void prevalidate(ConfigurationDescriptions&); - - bool wantAllEvents() const { return wantAllEvents_; } - - BranchIDLists const* branchIDLists() const; - - OutputProcessBlockHelper const& outputProcessBlockHelper() const { return outputProcessBlockHelper_; } - - ThinnedAssociationsHelper const* thinnedAssociationsHelper() const; - - const ModuleDescription& moduleDescription() const { return moduleDescription_; } - //Output modules always need writeRun and writeLumi to be called bool wantsGlobalRuns() const { return true; } bool wantsGlobalLuminosityBlocks() const { return true; } @@ -119,14 +51,9 @@ namespace edm { virtual bool wantsStreamLuminosityBlocks() const = 0; protected: - ModuleDescription const& description() const; - - ParameterSetID selectorConfig() const { return selector_config_id_; } - void doPreallocate(PreallocationConfiguration const&); void doBeginJob(); - void doEndJob(); void doBeginStream(StreamID id) { doBeginStream_(id); } void doEndStream(StreamID id) { doEndStream_(id); } @@ -141,113 +68,12 @@ namespace edm { ModuleCallingContext const& iModuleCallingContext, Principal const& iPrincipal) const {} - void doBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - void doAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - void doEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - bool doBeginRun(RunTransitionInfo const&, ModuleCallingContext const*); - bool doEndRun(RunTransitionInfo const&, ModuleCallingContext const*); - bool doBeginLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*); - bool doEndLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*); - - void setEventSelectionInfo( - std::map>> const& outputModulePathPositions, - bool anyProductProduced); - - void configure(OutputModuleDescription const& desc); - - std::map const& droppedBranchIDToKeptBranchID() { - return droppedBranchIDToKeptBranchID_; - } - private: - int maxEvents_; - std::atomic remainingEvents_; - - // TODO: Give OutputModule - // an interface (protected?) that supplies client code with the - // needed functionality *without* giving away implementation - // details ... don't just return a reference to keptProducts_, because - // we are looking to have the flexibility to change the - // implementation of keptProducts_ without modifying clients. When this - // change is made, we'll have a one-time-only task of modifying - // clients (classes derived from OutputModule) to use the - // newly-introduced interface. - // TODO: Consider using shared pointers here? - - // keptProducts_ are pointers to the BranchDescription objects describing - // the branches we are to write. - // - // We do not own the BranchDescriptions to which we point. - SelectedProductsForBranchType keptProducts_; - std::array hasNewlyDroppedBranch_; - - std::string process_name_; - ProductSelectorRules productSelectorRules_; - ProductSelector productSelector_; - ModuleDescription moduleDescription_; - - bool wantAllEvents_; - std::vector selectors_; - ParameterSet selectEvents_; - // ID of the ParameterSet that configured the event selector - // subsystem. - ParameterSetID selector_config_id_; - - // needed because of possible EDAliases. - // filled in only if key and value are different. - std::map droppedBranchIDToKeptBranchID_; - edm::propagate_const> branchIDLists_; - BranchIDLists const* origBranchIDLists_; - - edm::propagate_const> thinnedAssociationsHelper_; - std::map keepAssociation_; - - OutputProcessBlockHelper outputProcessBlockHelper_; - - //------------------------------------------------------------------ - // private member functions - //------------------------------------------------------------------ - - void updateBranchIDListsWithKeptAliases(); - - void doWriteProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*); - void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*, MergeableRunProductMetadata const*); - void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*); - void doOpenFile(FileBlock const& fb); - void doRespondToOpenInputFile(FileBlock const& fb); - void doRespondToCloseInputFile(FileBlock const& fb); - void doRespondToCloseOutputFile() {} - void doRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) {} - std::string workerType() const { return "WorkerT"; } - /// Tell the OutputModule that is must end the current file. - void doCloseFile(); - void registerProductsAndCallbacks(OutputModuleBase const*, ProductRegistry const*) {} - bool needToRunSelection() const; - std::vector productsUsedBySelection() const; - bool prePrefetchSelection(StreamID id, EventPrincipal const&, ModuleCallingContext const*); - - // Do the end-of-file tasks; this is only called internally, after - // the appropriate tests have been done. - virtual void reallyCloseFile(); - - /// Ask the OutputModule if we should end the current file. - virtual bool shouldWeCloseFile() const { return false; } - - virtual void write(EventForOutput const&) = 0; - virtual void beginJob() {} - virtual void endJob() {} - virtual void writeLuminosityBlock(LuminosityBlockForOutput const&) = 0; - virtual void writeRun(RunForOutput const&) = 0; - virtual void writeProcessBlock(ProcessBlockForOutput const&) {} - virtual void openFile(FileBlock const&) {} - virtual bool isFileOpen() const { return true; } - virtual void preallocStreams(unsigned int) {} - virtual void preallocLumis(unsigned int) {} virtual void preallocate(PreallocationConfiguration const&) {} virtual void doBeginStream_(StreamID) {} virtual void doEndStream_(StreamID) {} @@ -258,30 +84,13 @@ namespace edm { virtual void doStreamEndLuminosityBlock_(StreamID, LuminosityBlockForOutput const&, EventSetup const&) {} virtual void doStreamEndLuminosityBlockSummary_(StreamID, LuminosityBlockForOutput const&, EventSetup const&) {} - virtual void doBeginRun_(RunForOutput const&) {} virtual void doBeginRunSummary_(RunForOutput const&, EventSetup const&) {} - virtual void doEndRun_(RunForOutput const&) {} virtual void doEndRunSummary_(RunForOutput const&, EventSetup const&) {} - virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const&) {} virtual void doBeginLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {} - virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const&) {} virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {} - virtual void doRespondToOpenInputFile_(FileBlock const&) {} - virtual void doRespondToCloseInputFile_(FileBlock const&) {} virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder&) {} - virtual void setProcessesWithSelectedMergeableRunProducts(std::set const&) {} - virtual bool hasAcquire() const { return false; } - bool hasAccumulator() const { return false; } - - void keepThisBranch(BranchDescription const& desc, - std::map& trueBranchIDToKeptBranchDesc, - std::set& keptProductsInEvent); - - void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; } - - bool limitReached() const { return remainingEvents_ == 0; } }; } // namespace global } // namespace edm diff --git a/FWCore/Framework/interface/limited/OutputModuleBase.h b/FWCore/Framework/interface/limited/OutputModuleBase.h index 1fa70f257003b..03ab172ea65e3 100644 --- a/FWCore/Framework/interface/limited/OutputModuleBase.h +++ b/FWCore/Framework/interface/limited/OutputModuleBase.h @@ -17,54 +17,17 @@ // // system include files -#include -#include -#include -#include -#include -#include -#include -#include // user include files -#include "DataFormats/Provenance/interface/BranchID.h" -#include "DataFormats/Provenance/interface/BranchIDList.h" -#include "DataFormats/Provenance/interface/ModuleDescription.h" -#include "DataFormats/Provenance/interface/SelectedProducts.h" - -#include "FWCore/Common/interface/FWCoreCommonFwd.h" -#include "FWCore/Common/interface/OutputProcessBlockHelper.h" -#include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h" -#include "FWCore/Framework/interface/Frameworkfwd.h" -#include "FWCore/Framework/interface/ProductSelectorRules.h" -#include "FWCore/Framework/interface/ProductSelector.h" -#include "FWCore/Framework/interface/EDConsumerBase.h" -#include "FWCore/Framework/interface/getAllTriggerNames.h" -#include "FWCore/ParameterSet/interface/ParameterSetfwd.h" -#include "FWCore/Utilities/interface/propagate_const.h" +#include "FWCore/Framework/interface/OutputModuleCore.h" #include "FWCore/Concurrency/interface/LimitedTaskQueue.h" -#include "FWCore/Concurrency/interface/WaitingTaskHolder.h" // forward declarations namespace edm { - class MergeableRunProductMetadata; - class ModuleCallingContext; - class PreallocationConfiguration; - class ActivityRegistry; - class ThinnedAssociationsHelper; - - template - class OutputModuleCommunicatorT; - - namespace maker { - template - class ModuleHolderT; - } - namespace limited { - class OutputModuleBase : public EDConsumerBase { + class OutputModuleBase : public core::OutputModuleCore { public: template friend class edm::maker::ModuleHolderT; @@ -75,40 +38,13 @@ namespace edm { typedef OutputModuleBase ModuleType; explicit OutputModuleBase(ParameterSet const& pset); - ~OutputModuleBase() override; OutputModuleBase(OutputModuleBase const&) = delete; // Disallow copying and moving OutputModuleBase& operator=(OutputModuleBase const&) = delete; // Disallow copying and moving - /// Accessor for maximum number of events to be written. - /// -1 is used for unlimited. - int maxEvents() const { return maxEvents_; } - - /// Accessor for remaining number of events to be written. - /// -1 is used for unlimited. - int remainingEvents() const { return remainingEvents_; } - - bool selected(BranchDescription const& desc) const; - - void selectProducts(ProductRegistry const& preg, ThinnedAssociationsHelper const&, ProcessBlockHelperBase const&); - std::string const& processName() const { return process_name_; } - SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; } - std::array const& hasNewlyDroppedBranch() const { return hasNewlyDroppedBranch_; } - - static void fillDescription(ParameterSetDescription& desc); - static void fillDescriptions(ConfigurationDescriptions& descriptions); - static const std::string& baseType(); - static void prevalidate(ConfigurationDescriptions&); - - bool wantAllEvents() const { return wantAllEvents_; } - - BranchIDLists const* branchIDLists() const; - - OutputProcessBlockHelper const& outputProcessBlockHelper() const { return outputProcessBlockHelper_; } - - ThinnedAssociationsHelper const* thinnedAssociationsHelper() const; - - const ModuleDescription& moduleDescription() const { return moduleDescription_; } + static void fillDescription( + ParameterSetDescription& desc, + std::vector const& iDefaultOutputCommands = ProductSelectorRules::defaultSelectionStrings()); //Output modules always need writeRun and writeLumi to be called bool wantsGlobalRuns() const { return true; } @@ -123,14 +59,9 @@ namespace edm { LimitedTaskQueue& queue() { return queue_; } protected: - ModuleDescription const& description() const; - - ParameterSetID selectorConfig() const { return selector_config_id_; } - void doPreallocate(PreallocationConfiguration const&); void doBeginJob(); - void doEndJob(); void doBeginStream(StreamID id); void doEndStream(StreamID id); @@ -141,115 +72,14 @@ namespace edm { ModuleCallingContext const&, Principal const&) const {} - void doBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - void doAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - void doEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - bool doBeginRun(RunTransitionInfo const&, ModuleCallingContext const*); - bool doEndRun(RunTransitionInfo const&, ModuleCallingContext const*); - bool doBeginLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*); - bool doEndLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*); - - void setEventSelectionInfo( - std::map>> const& outputModulePathPositions, - bool anyProductProduced); - - void configure(OutputModuleDescription const& desc); - - std::map const& droppedBranchIDToKeptBranchID() { - return droppedBranchIDToKeptBranchID_; - } - private: - int maxEvents_; - std::atomic remainingEvents_; - - // TODO: Give OutputModule - // an interface (protected?) that supplies client code with the - // needed functionality *without* giving away implementation - // details ... don't just return a reference to keptProducts_, because - // we are looking to have the flexibility to change the - // implementation of keptProducts_ without modifying clients. When this - // change is made, we'll have a one-time-only task of modifying - // clients (classes derived from OutputModule) to use the - // newly-introduced interface. - // TODO: Consider using shared pointers here? - - // keptProducts_ are pointers to the BranchDescription objects describing - // the branches we are to write. - // - // We do not own the BranchDescriptions to which we point. - SelectedProductsForBranchType keptProducts_; - std::array hasNewlyDroppedBranch_; - - std::string process_name_; - ProductSelectorRules productSelectorRules_; - ProductSelector productSelector_; - ModuleDescription moduleDescription_; - - bool wantAllEvents_; - std::vector selectors_; - ParameterSet selectEvents_; - // ID of the ParameterSet that configured the event selector - // subsystem. - ParameterSetID selector_config_id_; - - // needed because of possible EDAliases. - // filled in only if key and value are different. - std::map droppedBranchIDToKeptBranchID_; - edm::propagate_const> branchIDLists_; - BranchIDLists const* origBranchIDLists_; - - edm::propagate_const> thinnedAssociationsHelper_; - std::map keepAssociation_; - - OutputProcessBlockHelper outputProcessBlockHelper_; - LimitedTaskQueue queue_; - //------------------------------------------------------------------ - // private member functions - //------------------------------------------------------------------ - - void updateBranchIDListsWithKeptAliases(); - - void doWriteProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*); - void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*, MergeableRunProductMetadata const*); - void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*); - void doOpenFile(FileBlock const& fb); - void doRespondToOpenInputFile(FileBlock const& fb); - void doRespondToCloseInputFile(FileBlock const& fb); - void doRespondToCloseOutputFile() {} - void doRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) {} - std::string workerType() const { return "WorkerT"; } - /// Tell the OutputModule that is must end the current file. - void doCloseFile(); - void registerProductsAndCallbacks(OutputModuleBase const*, ProductRegistry const*) {} - bool needToRunSelection() const; - std::vector productsUsedBySelection() const; - bool prePrefetchSelection(StreamID id, EventPrincipal const&, ModuleCallingContext const*); - - // Do the end-of-file tasks; this is only called internally, after - // the appropriate tests have been done. - virtual void reallyCloseFile(); - - /// Ask the OutputModule if we should end the current file. - virtual bool shouldWeCloseFile() const { return false; } - - virtual void write(EventForOutput const&) = 0; - virtual void beginJob() {} - virtual void endJob() {} - virtual void writeLuminosityBlock(LuminosityBlockForOutput const&) = 0; - virtual void writeRun(RunForOutput const&) = 0; - virtual void writeProcessBlock(ProcessBlockForOutput const&) {} - virtual void openFile(FileBlock const&) const {} - virtual bool isFileOpen() const { return true; } - virtual void preallocStreams(unsigned int) {} - virtual void preallocLumis(unsigned int) {} virtual void preallocate(PreallocationConfiguration const&) {} virtual void doBeginStream_(StreamID) {} virtual void doEndStream_(StreamID) {} @@ -260,29 +90,11 @@ namespace edm { virtual void doStreamEndLuminosityBlock_(StreamID, LuminosityBlockForOutput const&, EventSetup const&) {} virtual void doStreamEndLuminosityBlockSummary_(StreamID, LuminosityBlockForOutput const&, EventSetup const&) {} - virtual void doBeginRun_(RunForOutput const&) {} virtual void doBeginRunSummary_(RunForOutput const&, EventSetup const&) {} - virtual void doEndRun_(RunForOutput const&) {} virtual void doEndRunSummary_(RunForOutput const&, EventSetup const&) {} - virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const&) {} virtual void doBeginLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {} - virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const&) {} virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {} - virtual void doRespondToOpenInputFile_(FileBlock const&) {} - virtual void doRespondToCloseInputFile_(FileBlock const&) {} - - virtual void setProcessesWithSelectedMergeableRunProducts(std::set const&) {} - bool hasAcquire() const { return false; } - bool hasAccumulator() const { return false; } - - void keepThisBranch(BranchDescription const& desc, - std::map& trueBranchIDToKeptBranchDesc, - std::set& keptProductsInEvent); - - void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; } - - bool limitReached() const { return remainingEvents_ == 0; } }; } // namespace limited } // namespace edm diff --git a/FWCore/Framework/interface/one/OutputModuleBase.h b/FWCore/Framework/interface/one/OutputModuleBase.h index 8b5a5e674e46a..faed03c77751e 100644 --- a/FWCore/Framework/interface/one/OutputModuleBase.h +++ b/FWCore/Framework/interface/one/OutputModuleBase.h @@ -19,54 +19,19 @@ // // system include files -#include -#include -#include -#include -#include -#include -#include // user include files -#include "DataFormats/Provenance/interface/BranchID.h" -#include "DataFormats/Provenance/interface/BranchIDList.h" -#include "DataFormats/Provenance/interface/ModuleDescription.h" -#include "DataFormats/Provenance/interface/SelectedProducts.h" - -#include "FWCore/Common/interface/FWCoreCommonFwd.h" -#include "FWCore/Common/interface/OutputProcessBlockHelper.h" -#include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h" -#include "FWCore/Framework/interface/Frameworkfwd.h" -#include "FWCore/Framework/interface/ProductSelectorRules.h" -#include "FWCore/Framework/interface/ProductSelector.h" -#include "FWCore/Framework/interface/EDConsumerBase.h" -#include "FWCore/Framework/interface/getAllTriggerNames.h" +#include "FWCore/Framework/interface/OutputModuleCore.h" #include "FWCore/Framework/interface/SharedResourcesAcquirer.h" -#include "FWCore/ParameterSet/interface/ParameterSetfwd.h" -#include "FWCore/Concurrency/interface/WaitingTaskHolder.h" -#include "FWCore/Utilities/interface/propagate_const.h" // forward declarations namespace edm { - class MergeableRunProductMetadata; - class ModuleCallingContext; - class PreallocationConfiguration; - class ActivityRegistry; - class ThinnedAssociationsHelper; class SubProcessParentageHelper; - template - class OutputModuleCommunicatorT; - - namespace maker { - template - class ModuleHolderT; - } - namespace one { - class OutputModuleBase : public EDConsumerBase { + class OutputModuleBase : public core::OutputModuleCore { public: template friend class edm::maker::ModuleHolderT; @@ -77,33 +42,10 @@ namespace edm { typedef OutputModuleBase ModuleType; explicit OutputModuleBase(ParameterSet const& pset); - ~OutputModuleBase() override; OutputModuleBase(OutputModuleBase const&) = delete; // Disallow copying and moving OutputModuleBase& operator=(OutputModuleBase const&) = delete; // Disallow copying and moving - /// Accessor for maximum number of events to be written. - /// -1 is used for unlimited. - int maxEvents() const { return maxEvents_; } - - /// Accessor for remaining number of events to be written. - /// -1 is used for unlimited. - int remainingEvents() const { return remainingEvents_; } - - bool selected(BranchDescription const& desc) const; - - void selectProducts(ProductRegistry const& preg, ThinnedAssociationsHelper const&, ProcessBlockHelperBase const&); - std::string const& processName() const { return process_name_; } - SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; } - std::array const& hasNewlyDroppedBranch() const { return hasNewlyDroppedBranch_; } - - static void fillDescription( - ParameterSetDescription& desc, - std::vector const& iDefaultOutputCommands = ProductSelectorRules::defaultSelectionStrings()); - static void fillDescriptions(ConfigurationDescriptions& descriptions); - static const std::string& baseType(); - static void prevalidate(ConfigurationDescriptions&); - //Output modules always need writeRun and writeLumi to be called virtual bool wantsProcessBlocks() const = 0; virtual bool wantsInputProcessBlocks() const = 0; @@ -116,163 +58,34 @@ namespace edm { virtual SerialTaskQueue* globalLuminosityBlocksQueue() { return nullptr; } SharedResourcesAcquirer& sharedResourcesAcquirer() { return resourcesAcquirer_; } - bool wantAllEvents() const { return wantAllEvents_; } - - BranchIDLists const* branchIDLists(); - - OutputProcessBlockHelper const& outputProcessBlockHelper() const { return outputProcessBlockHelper_; } - - ThinnedAssociationsHelper const* thinnedAssociationsHelper() const; - SubProcessParentageHelper const* subProcessParentageHelper() const { return subProcessParentageHelper_; } - const ModuleDescription& moduleDescription() const { return moduleDescription_; } - protected: - ModuleDescription const& description() const; - - ParameterSetID selectorConfig() const { return selector_config_id_; } - void doPreallocate(PreallocationConfiguration const&); - virtual void preallocLumis(unsigned int); void doBeginJob(); - void doEndJob(); bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*); - void doBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - void doAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - void doEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {} - bool doBeginRun(RunTransitionInfo const&, ModuleCallingContext const*); - bool doEndRun(RunTransitionInfo const&, ModuleCallingContext const*); - bool doBeginLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*); - bool doEndLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*); - - void setEventSelectionInfo( - std::map>> const& outputModulePathPositions, - bool anyProductProduced); void configure(OutputModuleDescription const& desc); - std::map const& droppedBranchIDToKeptBranchID() { - return droppedBranchIDToKeptBranchID_; - } - private: - int maxEvents_; - std::atomic remainingEvents_; - - // TODO: Give OutputModule - // an interface (protected?) that supplies client code with the - // needed functionality *without* giving away implementation - // details ... don't just return a reference to keptProducts_, because - // we are looking to have the flexibility to change the - // implementation of keptProducts_ without modifying clients. When this - // change is made, we'll have a one-time-only task of modifying - // clients (classes derived from OutputModule) to use the - // newly-introduced interface. - // TODO: Consider using shared pointers here? - - // keptProducts_ are pointers to the BranchDescription objects describing - // the branches we are to write. - // - // We do not own the BranchDescriptions to which we point. - SelectedProductsForBranchType keptProducts_; - std::array hasNewlyDroppedBranch_; - - std::string process_name_; - ProductSelectorRules productSelectorRules_; - ProductSelector productSelector_; - ModuleDescription moduleDescription_; - - bool wantAllEvents_; - std::vector selectors_; - ParameterSet selectEvents_; - // ID of the ParameterSet that configured the event selector - // subsystem. - ParameterSetID selector_config_id_; - - // needed because of possible EDAliases. - // filled in only if key and value are different. - std::map droppedBranchIDToKeptBranchID_; - edm::propagate_const> branchIDLists_; - BranchIDLists const* origBranchIDLists_; - SubProcessParentageHelper const* subProcessParentageHelper_; - edm::propagate_const> thinnedAssociationsHelper_; - std::map keepAssociation_; - - OutputProcessBlockHelper outputProcessBlockHelper_; - SharedResourcesAcquirer resourcesAcquirer_; SerialTaskQueue runQueue_; SerialTaskQueue luminosityBlockQueue_; - //------------------------------------------------------------------ - // private member functions - //------------------------------------------------------------------ - virtual SharedResourcesAcquirer createAcquirer(); - void doWriteProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*); - void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*, MergeableRunProductMetadata const*); - void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*); - void doOpenFile(FileBlock const& fb); - void doRespondToOpenInputFile(FileBlock const& fb); - void doRespondToCloseInputFile(FileBlock const& fb); - void doRespondToCloseOutputFile() {} - void doRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) {} - std::string workerType() const { return "WorkerT"; } - /// Tell the OutputModule that is must end the current file. - void doCloseFile(); - void registerProductsAndCallbacks(OutputModuleBase const*, ProductRegistry const*) {} - bool needToRunSelection() const; - std::vector productsUsedBySelection() const; - bool prePrefetchSelection(StreamID id, EventPrincipal const&, ModuleCallingContext const*); - - // Do the end-of-file tasks; this is only called internally, after - // the appropriate tests have been done. - virtual void reallyCloseFile(); - - /// Ask the OutputModule if we should end the current file. - virtual bool shouldWeCloseFile() const { return false; } - - virtual void write(EventForOutput const&) = 0; virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask, ModuleCallingContext const& iModuleCallingContext, Principal const& iPrincipal) const {} - virtual void beginJob() {} - virtual void endJob() {} - virtual void writeLuminosityBlock(LuminosityBlockForOutput const&) = 0; - virtual void writeRun(RunForOutput const&) = 0; - virtual void writeProcessBlock(ProcessBlockForOutput const&) {} - virtual void openFile(FileBlock const&) {} - virtual bool isFileOpen() const { return true; } - - virtual void doBeginRun_(RunForOutput const&) {} - virtual void doEndRun_(RunForOutput const&) {} - virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const&) {} - virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const&) {} - virtual void doRespondToOpenInputFile_(FileBlock const&) {} - virtual void doRespondToCloseInputFile_(FileBlock const&) {} - - virtual void setProcessesWithSelectedMergeableRunProducts(std::set const&) {} - bool hasAcquire() const { return false; } - bool hasAccumulator() const { return false; } - - void keepThisBranch(BranchDescription const& desc, - std::map& trueBranchIDToKeptBranchDesc, - std::set& keptProductsInEvent); - - void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; } - - bool limitReached() const { return remainingEvents_ == 0; } }; } // namespace one } // namespace edm diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index beacc275cdc97..83d1ef1034417 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -769,10 +769,6 @@ namespace edm { int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); } - void EventProcessor::enableEndPaths(bool active) { schedule_->enableEndPaths(active); } - - bool EventProcessor::endPathsEnabled() const { return schedule_->endPathsEnabled(); } - void EventProcessor::clearCounters() { schedule_->clearCounters(); } namespace { diff --git a/FWCore/Framework/src/OutputModuleCore.cc b/FWCore/Framework/src/OutputModuleCore.cc new file mode 100644 index 0000000000000..6e73deaed3f43 --- /dev/null +++ b/FWCore/Framework/src/OutputModuleCore.cc @@ -0,0 +1,409 @@ +// -*- C++ -*- +// +// Package: FWCore/Framework +// Class : OutputModuleCore +// +// Implementation: +// [Notes on implementation] +// +// Original Author: Chris Jones +// Created: Wed, 31 Jul 2013 15:59:19 GMT +// + +// system include files +#include + +// user include files +#include "FWCore/Framework/interface/OutputModuleCore.h" + +#include "DataFormats/Common/interface/Handle.h" +#include "DataFormats/Common/interface/ThinnedAssociation.h" +#include "DataFormats/Common/interface/EndPathStatus.h" +#include "DataFormats/Provenance/interface/BranchDescription.h" +#include "DataFormats/Provenance/interface/BranchKey.h" +#include "DataFormats/Provenance/interface/ProductRegistry.h" +#include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h" +#include "FWCore/Framework/interface/EventForOutput.h" +#include "FWCore/Framework/interface/EventPrincipal.h" +#include "FWCore/Framework/src/insertSelectedProcesses.h" +#include "FWCore/Framework/interface/LuminosityBlockForOutput.h" +#include "FWCore/Framework/interface/ProcessBlockForOutput.h" +#include "FWCore/Framework/interface/RunForOutput.h" +#include "FWCore/Framework/src/OutputModuleDescription.h" +#include "FWCore/Framework/interface/TriggerNamesService.h" +#include "FWCore/Framework/src/EventSignalsSentry.h" +#include "FWCore/Framework/interface/PreallocationConfiguration.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/Utilities/interface/DebugMacros.h" +#include "FWCore/Reflection/interface/DictionaryTools.h" + +namespace edm { + namespace core { + + // ------------------------------------------------------- + OutputModuleCore::OutputModuleCore(ParameterSet const& pset) + : remainingEvents_(-1), + maxEvents_(-1), + keptProducts_(), + hasNewlyDroppedBranch_(), + process_name_(), + productSelectorRules_(pset, "outputCommands", "OutputModule"), + productSelector_(), + moduleDescription_(), + wantAllEvents_(false), + selectors_(), + selector_config_id_(), + droppedBranchIDToKeptBranchID_(), + branchIDLists_(new BranchIDLists), + origBranchIDLists_(nullptr), + thinnedAssociationsHelper_(new ThinnedAssociationsHelper) { + hasNewlyDroppedBranch_.fill(false); + + Service tns; + process_name_ = tns->getProcessName(); + + selectEvents_ = pset.getUntrackedParameterSet("SelectEvents", ParameterSet()); + + selectEvents_.registerIt(); // Just in case this PSet is not registered + + selector_config_id_ = selectEvents_.id(); + + //need to set wantAllEvents_ in constructor + // we will make the remaining selectors once we know how many streams + selectors_.resize(1); + wantAllEvents_ = detail::configureEventSelector( + selectEvents_, process_name_, getAllTriggerNames(), selectors_[0], consumesCollector()); + + //Check if on final path + if (pset.exists("@onFinalPath")) { + onFinalPath_ = pset.getUntrackedParameter("@onFinalPath"); + } + if (onFinalPath_) { + wantAllEvents_ = false; + if (not getAllTriggerNames().empty() and selectors_.front().numberOfTokens() == 0) { + //need to wait for trigger paths to finish + tokensForEndPaths_.push_back(consumes(edm::InputTag("TriggerResults", "", process_name_))); + } + //need to wait for EndPaths to finish + for (auto const& n : tns->getEndPaths()) { + if (n == "@finalPath") { + continue; + } + tokensForEndPaths_.push_back(consumes(edm::InputTag(n, "", process_name_))); + } + } + } + + void OutputModuleCore::configure(OutputModuleDescription const& desc) { + remainingEvents_ = maxEvents_ = desc.maxEvents_; + origBranchIDLists_ = desc.branchIDLists_; + } + + void OutputModuleCore::selectProducts(ProductRegistry const& preg, + ThinnedAssociationsHelper const& thinnedAssociationsHelper, + ProcessBlockHelperBase const& processBlockHelper) { + if (productSelector_.initialized()) + return; + productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions()); + + // TODO: See if we can collapse keptProducts_ and productSelector_ into a + // single object. See the notes in the header for ProductSelector + // for more information. + + std::map trueBranchIDToKeptBranchDesc; + std::vector associationDescriptions; + std::set keptProductsInEvent; + std::set processesWithSelectedMergeableRunProducts; + std::set processesWithKeptProcessBlockProducts; + + for (auto const& it : preg.productList()) { + BranchDescription const& desc = it.second; + if (desc.transient()) { + // if the class of the branch is marked transient, output nothing + } else if (!desc.present() && !desc.produced()) { + // else if the branch containing the product has been previously dropped, + // output nothing + } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) { + associationDescriptions.push_back(&desc); + } else if (selected(desc)) { + keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent); + insertSelectedProcesses( + desc, processesWithSelectedMergeableRunProducts, processesWithKeptProcessBlockProducts); + } else { + // otherwise, output nothing, + // and mark the fact that there is a newly dropped branch of this type. + hasNewlyDroppedBranch_[desc.branchType()] = true; + } + } + + setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts); + + thinnedAssociationsHelper.selectAssociationProducts( + associationDescriptions, keptProductsInEvent, keepAssociation_); + + for (auto association : associationDescriptions) { + if (keepAssociation_[association->branchID()]) { + keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent); + } else { + hasNewlyDroppedBranch_[association->branchType()] = true; + } + } + + // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept. + ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_); + + thinnedAssociationsHelper_->updateFromParentProcess( + thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_); + outputProcessBlockHelper_.updateAfterProductSelection(processesWithKeptProcessBlockProducts, processBlockHelper); + } + + void OutputModuleCore::updateBranchIDListsWithKeptAliases() { + if (!droppedBranchIDToKeptBranchID_.empty()) { + // Make a private copy of the BranchIDLists. + *branchIDLists_ = *origBranchIDLists_; + // Check for branches dropped while an EDAlias was kept. + for (BranchIDList& branchIDList : *branchIDLists_) { + for (BranchID::value_type& branchID : branchIDList) { + // Replace BranchID of each dropped branch with that of the kept + // alias, so the alias branch will have the product ID of the original branch. + std::map::const_iterator iter = + droppedBranchIDToKeptBranchID_.find(branchID); + if (iter != droppedBranchIDToKeptBranchID_.end()) { + branchID = iter->second; + } + } + } + } + } + + void OutputModuleCore::keepThisBranch(BranchDescription const& desc, + std::map& trueBranchIDToKeptBranchDesc, + std::set& keptProductsInEvent) { + ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc); + + EDGetToken token; + + std::vector missingDictionaries; + if (!checkDictionary(missingDictionaries, desc.className(), desc.unwrappedType())) { + std::string context("Calling OutputModuleCore::keepThisBranch, checking dictionaries for kept types"); + throwMissingDictionariesException(missingDictionaries, context); + } + + switch (desc.branchType()) { + case InEvent: { + if (desc.produced()) { + keptProductsInEvent.insert(desc.originalBranchID()); + } else { + keptProductsInEvent.insert(desc.branchID()); + } + token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, + InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()}); + break; + } + case InLumi: { + token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, + InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); + break; + } + case InRun: { + token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, + InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); + break; + } + case InProcess: { + token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, + InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); + break; + } + default: + assert(false); + break; + } + // Now put it in the list of selected branches. + keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token)); + } + + OutputModuleCore::~OutputModuleCore() {} + + void OutputModuleCore::doPreallocate_(PreallocationConfiguration const& iPC) { + auto nstreams = iPC.numberOfStreams(); + selectors_.resize(nstreams); + + preallocLumis(iPC.numberOfLuminosityBlocks()); + + bool seenFirst = false; + for (auto& s : selectors_) { + if (seenFirst) { + detail::configureEventSelector(selectEvents_, process_name_, getAllTriggerNames(), s, consumesCollector()); + } else { + seenFirst = true; + } + } + } + + void OutputModuleCore::preallocLumis(unsigned int) {} + + void OutputModuleCore::doBeginJob_() { + this->beginJob(); + if (onFinalPath_) { + //this stops prefetching of the data products + resetItemsToGetFrom(edm::InEvent); + } + } + + void OutputModuleCore::doEndJob() { endJob(); } + + bool OutputModuleCore::needToRunSelection() const { return !wantAllEvents_; } + + std::vector OutputModuleCore::productsUsedBySelection() const { + std::vector returnValue; + auto const& s = selectors_[0]; + auto const n = s.numberOfTokens(); + returnValue.reserve(n + tokensForEndPaths_.size()); + + for (unsigned int i = 0; i < n; ++i) { + returnValue.emplace_back(uncheckedIndexFrom(s.token(i))); + } + for (auto token : tokensForEndPaths_) { + returnValue.emplace_back(uncheckedIndexFrom(token)); + } + return returnValue; + } + + bool OutputModuleCore::prePrefetchSelection(StreamID id, + EventPrincipal const& ep, + ModuleCallingContext const* mcc) { + if (wantAllEvents_) + return true; + auto& s = selectors_[id.value()]; + EventForOutput e(ep, moduleDescription_, mcc); + e.setConsumer(this); + return s.wantEvent(e); + } + + bool OutputModuleCore::doEvent_(EventTransitionInfo const& info, + ActivityRegistry* act, + ModuleCallingContext const* mcc) { + { + EventForOutput e(info, moduleDescription_, mcc); + e.setConsumer(this); + EventSignalsSentry sentry(act, mcc); + write(e); + } + //remainingEvents_ is decremented by inheriting classes + return true; + } + + bool OutputModuleCore::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { + RunForOutput r(info, moduleDescription_, mcc, false); + r.setConsumer(this); + doBeginRun_(r); + return true; + } + + bool OutputModuleCore::doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { + RunForOutput r(info, moduleDescription_, mcc, true); + r.setConsumer(this); + doEndRun_(r); + return true; + } + + void OutputModuleCore::doWriteProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) { + ProcessBlockForOutput pb(pbp, moduleDescription_, mcc, true); + pb.setConsumer(this); + writeProcessBlock(pb); + } + + void OutputModuleCore::doWriteRun(RunPrincipal const& rp, + ModuleCallingContext const* mcc, + MergeableRunProductMetadata const* mrpm) { + RunForOutput r(rp, moduleDescription_, mcc, true, mrpm); + r.setConsumer(this); + writeRun(r); + } + + bool OutputModuleCore::doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) { + LuminosityBlockForOutput lb(info, moduleDescription_, mcc, false); + lb.setConsumer(this); + doBeginLuminosityBlock_(lb); + return true; + } + + bool OutputModuleCore::doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) { + LuminosityBlockForOutput lb(info, moduleDescription_, mcc, true); + lb.setConsumer(this); + doEndLuminosityBlock_(lb); + + return true; + } + + void OutputModuleCore::doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, + ModuleCallingContext const* mcc) { + LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true); + lb.setConsumer(this); + writeLuminosityBlock(lb); + } + + void OutputModuleCore::doOpenFile(FileBlock const& fb) { openFile(fb); } + + void OutputModuleCore::doRespondToOpenInputFile(FileBlock const& fb) { + updateBranchIDListsWithKeptAliases(); + doRespondToOpenInputFile_(fb); + } + + void OutputModuleCore::doRespondToCloseInputFile(FileBlock const& fb) { doRespondToCloseInputFile_(fb); } + + void OutputModuleCore::doCloseFile() { + if (isFileOpen()) { + reallyCloseFile(); + } + } + + void OutputModuleCore::reallyCloseFile() {} + + BranchIDLists const* OutputModuleCore::branchIDLists() const { + if (!droppedBranchIDToKeptBranchID_.empty()) { + return branchIDLists_.get(); + } + return origBranchIDLists_; + } + + ThinnedAssociationsHelper const* OutputModuleCore::thinnedAssociationsHelper() const { + return thinnedAssociationsHelper_.get(); + } + + ModuleDescription const& OutputModuleCore::description() const { return moduleDescription_; } + + bool OutputModuleCore::selected(BranchDescription const& desc) const { return productSelector_.selected(desc); } + + void OutputModuleCore::fillDescriptions(ConfigurationDescriptions& descriptions) { + ParameterSetDescription desc; + desc.setUnknown(); + descriptions.addDefault(desc); + } + + void OutputModuleCore::fillDescription(ParameterSetDescription& desc, + std::vector const& defaultOutputCommands) { + ProductSelectorRules::fillDescription(desc, "outputCommands", defaultOutputCommands); + EventSelector::fillDescription(desc); + desc.addOptionalNode(ParameterDescription("@onFinalPath", false, false), false); + } + + void OutputModuleCore::prevalidate(ConfigurationDescriptions&) {} + + static const std::string kBaseType("OutputModule"); + const std::string& OutputModuleCore::baseType() { return kBaseType; } + + void OutputModuleCore::setEventSelectionInfo( + std::map>> const& outputModulePathPositions, + bool anyProductProduced) { + selector_config_id_ = detail::registerProperSelectionInfo(getParameterSet(selector_config_id_), + description().moduleLabel(), + outputModulePathPositions, + anyProductProduced); + } + } // namespace core +} // namespace edm diff --git a/FWCore/Framework/src/ProductResolvers.cc b/FWCore/Framework/src/ProductResolvers.cc index 23b2e65c602a7..25de273846e77 100644 --- a/FWCore/Framework/src/ProductResolvers.cc +++ b/FWCore/Framework/src/ProductResolvers.cc @@ -436,15 +436,7 @@ namespace edm { SharedResourcesAcquirer*, ModuleCallingContext const*) const { if (!skipCurrentProcess and worker_) { - return resolveProductImpl([this]() { - edm::Exception ex(errors::UnimplementedFeature); - ex << "Attempting to run unscheduled module without doing prefetching"; - std::ostringstream ost; - ost << "Calling produce method for unscheduled module " << worker_->description()->moduleName() << "/'" - << worker_->description()->moduleLabel() << "'"; - ex.addContext(ost.str()); - throw ex; - }); + return resolveProductImpl([] {}); } return Resolution(nullptr); } diff --git a/FWCore/Framework/src/Schedule.cc b/FWCore/Framework/src/Schedule.cc index 8c30a26a14ddf..00fb0cef84037 100644 --- a/FWCore/Framework/src/Schedule.cc +++ b/FWCore/Framework/src/Schedule.cc @@ -690,8 +690,7 @@ namespace edm { preallocConfig_(prealloc), pathNames_(&tns.getTrigPaths()), endPathNames_(&tns.getEndPaths()), - wantSummary_(tns.wantSummary()), - endpathsAreActive_(true) { + wantSummary_(tns.wantSummary()) { makePathStatusInserters(pathStatusInserters_, *pathNames_, prealloc, @@ -1526,15 +1525,6 @@ namespace edm { } } - void Schedule::enableEndPaths(bool active) { - endpathsAreActive_ = active; - for (auto& s : streamSchedules_) { - s->enableEndPaths(active); - } - } - - bool Schedule::endPathsEnabled() const { return endpathsAreActive_; } - void Schedule::getTriggerReport(TriggerReport& rep) const { rep.eventSummary.totalEvents = 0; rep.eventSummary.totalEventsPassed = 0; diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index 43f518d75c1b9..7fd7500d242ab 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -162,7 +162,6 @@ namespace edm { number_of_unscheduled_modules_(0), streamID_(streamID), streamContext_(streamID_, processContext), - endpathsAreActive_(true), skippingEvent_(false) { ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet()); bool hasPath = false; @@ -832,10 +831,6 @@ namespace edm { } } - void StreamSchedule::enableEndPaths(bool active) { endpathsAreActive_ = active; } - - bool StreamSchedule::endPathsEnabled() const { return endpathsAreActive_; } - static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) { sum.timesVisited += path.timesVisited(which); sum.timesPassed += path.timesPassed(which); diff --git a/FWCore/Framework/src/global/OutputModuleBase.cc b/FWCore/Framework/src/global/OutputModuleBase.cc index 1931724b4e9f8..2989364e93df9 100644 --- a/FWCore/Framework/src/global/OutputModuleBase.cc +++ b/FWCore/Framework/src/global/OutputModuleBase.cc @@ -14,254 +14,29 @@ // user include files #include "FWCore/Framework/interface/global/OutputModuleBase.h" -#include "DataFormats/Common/interface/Handle.h" -#include "DataFormats/Common/interface/ThinnedAssociation.h" -#include "DataFormats/Provenance/interface/BranchDescription.h" -#include "DataFormats/Provenance/interface/BranchKey.h" -#include "DataFormats/Provenance/interface/ProductRegistry.h" -#include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h" #include "FWCore/Framework/interface/EventForOutput.h" -#include "FWCore/Framework/interface/EventPrincipal.h" -#include "FWCore/Framework/src/insertSelectedProcesses.h" -#include "FWCore/Framework/interface/LuminosityBlockForOutput.h" -#include "FWCore/Framework/interface/ProcessBlockForOutput.h" -#include "FWCore/Framework/interface/RunForOutput.h" -#include "FWCore/Framework/src/OutputModuleDescription.h" -#include "FWCore/Framework/interface/TriggerNamesService.h" -#include "FWCore/Framework/src/EventSignalsSentry.h" #include "FWCore/Framework/interface/PreallocationConfiguration.h" #include "FWCore/Framework/src/EventAcquireSignalsSentry.h" -#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" -#include "FWCore/ParameterSet/interface/ParameterSet.h" -#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" -#include "FWCore/ServiceRegistry/interface/Service.h" -#include "FWCore/Utilities/interface/DebugMacros.h" -#include "FWCore/Reflection/interface/DictionaryTools.h" namespace edm { namespace global { // ------------------------------------------------------- - OutputModuleBase::OutputModuleBase(ParameterSet const& pset) - : maxEvents_(-1), - remainingEvents_(maxEvents_), - keptProducts_(), - hasNewlyDroppedBranch_(), - process_name_(), - productSelectorRules_(pset, "outputCommands", "OutputModule"), - productSelector_(), - moduleDescription_(), - wantAllEvents_(false), - selectors_(), - selector_config_id_(), - droppedBranchIDToKeptBranchID_(), - branchIDLists_(new BranchIDLists), - origBranchIDLists_(nullptr), - thinnedAssociationsHelper_(new ThinnedAssociationsHelper) { - hasNewlyDroppedBranch_.fill(false); - - Service tns; - process_name_ = tns->getProcessName(); - - selectEvents_ = pset.getUntrackedParameterSet("SelectEvents", ParameterSet()); - - selectEvents_.registerIt(); // Just in case this PSet is not registered - - selector_config_id_ = selectEvents_.id(); - - //need to set wantAllEvents_ in constructor - // we will make the remaining selectors once we know how many streams - selectors_.resize(1); - wantAllEvents_ = detail::configureEventSelector( - selectEvents_, process_name_, getAllTriggerNames(), selectors_[0], consumesCollector()); - } - - void OutputModuleBase::configure(OutputModuleDescription const& desc) { - remainingEvents_ = maxEvents_ = desc.maxEvents_; - origBranchIDLists_ = desc.branchIDLists_; - } - - void OutputModuleBase::selectProducts(ProductRegistry const& preg, - ThinnedAssociationsHelper const& thinnedAssociationsHelper, - ProcessBlockHelperBase const& processBlockHelper) { - if (productSelector_.initialized()) - return; - productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions()); - - // TODO: See if we can collapse keptProducts_ and productSelector_ into a - // single object. See the notes in the header for ProductSelector - // for more information. - - std::map trueBranchIDToKeptBranchDesc; - std::vector associationDescriptions; - std::set keptProductsInEvent; - std::set processesWithSelectedMergeableRunProducts; - std::set processesWithKeptProcessBlockProducts; - - for (auto const& it : preg.productList()) { - BranchDescription const& desc = it.second; - if (desc.transient()) { - // if the class of the branch is marked transient, output nothing - } else if (!desc.present() && !desc.produced()) { - // else if the branch containing the product has been previously dropped, - // output nothing - } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) { - associationDescriptions.push_back(&desc); - } else if (selected(desc)) { - keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent); - insertSelectedProcesses( - desc, processesWithSelectedMergeableRunProducts, processesWithKeptProcessBlockProducts); - } else { - // otherwise, output nothing, - // and mark the fact that there is a newly dropped branch of this type. - hasNewlyDroppedBranch_[desc.branchType()] = true; - } - } - - setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts); - - thinnedAssociationsHelper.selectAssociationProducts( - associationDescriptions, keptProductsInEvent, keepAssociation_); - - for (auto association : associationDescriptions) { - if (keepAssociation_[association->branchID()]) { - keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent); - } else { - hasNewlyDroppedBranch_[association->branchType()] = true; - } - } - - // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept. - ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_); - - thinnedAssociationsHelper_->updateFromParentProcess( - thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_); - outputProcessBlockHelper_.updateAfterProductSelection(processesWithKeptProcessBlockProducts, processBlockHelper); - } - - void OutputModuleBase::updateBranchIDListsWithKeptAliases() { - if (!droppedBranchIDToKeptBranchID_.empty()) { - // Make a private copy of the BranchIDLists. - *branchIDLists_ = *origBranchIDLists_; - // Check for branches dropped while an EDAlias was kept. - for (BranchIDList& branchIDList : *branchIDLists_) { - for (BranchID::value_type& branchID : branchIDList) { - // Replace BranchID of each dropped branch with that of the kept - // alias, so the alias branch will have the product ID of the original branch. - std::map::const_iterator iter = - droppedBranchIDToKeptBranchID_.find(branchID); - if (iter != droppedBranchIDToKeptBranchID_.end()) { - branchID = iter->second; - } - } - } - } - } - - void OutputModuleBase::keepThisBranch(BranchDescription const& desc, - std::map& trueBranchIDToKeptBranchDesc, - std::set& keptProductsInEvent) { - ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc); - - EDGetToken token; - - std::vector missingDictionaries; - if (!checkDictionary(missingDictionaries, desc.className(), desc.unwrappedType())) { - std::string context("Calling OutputModuleBase::keepThisBranch, checking dictionaries for kept types"); - throwMissingDictionariesException(missingDictionaries, context); - } - - switch (desc.branchType()) { - case InEvent: { - if (desc.produced()) { - keptProductsInEvent.insert(desc.originalBranchID()); - } else { - keptProductsInEvent.insert(desc.branchID()); - } - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()}); - break; - } - case InLumi: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - case InRun: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - case InProcess: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - default: - assert(false); - break; - } - // Now put it in the list of selected branches. - keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token)); - } - - OutputModuleBase::~OutputModuleBase() {} + OutputModuleBase::OutputModuleBase(ParameterSet const& pset) : core::OutputModuleCore(pset) {} void OutputModuleBase::doPreallocate(PreallocationConfiguration const& iPC) { auto nstreams = iPC.numberOfStreams(); - selectors_.resize(nstreams); - - bool seenFirst = false; - for (auto& s : selectors_) { - if (seenFirst) { - detail::configureEventSelector(selectEvents_, process_name_, getAllTriggerNames(), s, consumesCollector()); - } else { - seenFirst = true; - } - } preallocStreams(nstreams); - preallocLumis(iPC.numberOfLuminosityBlocks()); + core::OutputModuleCore::doPreallocate_(iPC); preallocate(iPC); } - void OutputModuleBase::doBeginJob() { this->beginJob(); } - - void OutputModuleBase::doEndJob() { endJob(); } - - bool OutputModuleBase::needToRunSelection() const { return !wantAllEvents_; } - - std::vector OutputModuleBase::productsUsedBySelection() const { - std::vector returnValue; - auto const& s = selectors_[0]; - auto const n = s.numberOfTokens(); - returnValue.reserve(n); - - for (unsigned int i = 0; i < n; ++i) { - returnValue.emplace_back(uncheckedIndexFrom(s.token(i))); - } - return returnValue; - } - - bool OutputModuleBase::prePrefetchSelection(StreamID id, - EventPrincipal const& ep, - ModuleCallingContext const* mcc) { - if (wantAllEvents_) - return true; - auto& s = selectors_[id.value()]; - EventForOutput e(ep, moduleDescription_, mcc); - e.setConsumer(this); - return s.wantEvent(e); - } + void OutputModuleBase::doBeginJob() { core::OutputModuleCore::doBeginJob_(); } bool OutputModuleBase::doEvent(EventTransitionInfo const& info, ActivityRegistry* act, ModuleCallingContext const* mcc) { - { - EventForOutput e(info, moduleDescription_, mcc); - e.setConsumer(this); - EventSignalsSentry sentry(act, mcc); - write(e); - } + { core::OutputModuleCore::doEvent_(info, act, mcc); } auto remainingEvents = remainingEvents_.load(); bool keepTrying = remainingEvents > 0; @@ -281,116 +56,10 @@ namespace edm { ActivityRegistry* act, ModuleCallingContext const* mcc, WaitingTaskWithArenaHolder& holder) { - EventForOutput e(info, moduleDescription_, mcc); + EventForOutput e(info, moduleDescription(), mcc); e.setConsumer(this); EventAcquireSignalsSentry sentry(act, mcc); this->doAcquire_(e.streamID(), e, holder); } - - bool OutputModuleBase::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { - RunForOutput r(info, moduleDescription_, mcc, false); - r.setConsumer(this); - doBeginRun_(r); - return true; - } - - bool OutputModuleBase::doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { - RunForOutput r(info, moduleDescription_, mcc, true); - r.setConsumer(this); - doEndRun_(r); - return true; - } - - void OutputModuleBase::doWriteProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) { - ProcessBlockForOutput pb(pbp, moduleDescription_, mcc, true); - pb.setConsumer(this); - writeProcessBlock(pb); - } - - void OutputModuleBase::doWriteRun(RunPrincipal const& rp, - ModuleCallingContext const* mcc, - MergeableRunProductMetadata const* mrpm) { - RunForOutput r(rp, moduleDescription_, mcc, true, mrpm); - r.setConsumer(this); - writeRun(r); - } - - bool OutputModuleBase::doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(info, moduleDescription_, mcc, false); - lb.setConsumer(this); - doBeginLuminosityBlock_(lb); - return true; - } - - bool OutputModuleBase::doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(info, moduleDescription_, mcc, true); - lb.setConsumer(this); - doEndLuminosityBlock_(lb); - return true; - } - - void OutputModuleBase::doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, - ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true); - lb.setConsumer(this); - writeLuminosityBlock(lb); - } - - void OutputModuleBase::doOpenFile(FileBlock const& fb) { openFile(fb); } - - void OutputModuleBase::doRespondToOpenInputFile(FileBlock const& fb) { - updateBranchIDListsWithKeptAliases(); - doRespondToOpenInputFile_(fb); - } - - void OutputModuleBase::doRespondToCloseInputFile(FileBlock const& fb) { doRespondToCloseInputFile_(fb); } - - void OutputModuleBase::doCloseFile() { - if (isFileOpen()) { - reallyCloseFile(); - } - } - - void OutputModuleBase::reallyCloseFile() {} - - BranchIDLists const* OutputModuleBase::branchIDLists() const { - if (!droppedBranchIDToKeptBranchID_.empty()) { - return branchIDLists_.get(); - } - return origBranchIDLists_; - } - - ThinnedAssociationsHelper const* OutputModuleBase::thinnedAssociationsHelper() const { - return thinnedAssociationsHelper_.get(); - } - - ModuleDescription const& OutputModuleBase::description() const { return moduleDescription_; } - - bool OutputModuleBase::selected(BranchDescription const& desc) const { return productSelector_.selected(desc); } - - void OutputModuleBase::fillDescriptions(ConfigurationDescriptions& descriptions) { - ParameterSetDescription desc; - desc.setUnknown(); - descriptions.addDefault(desc); - } - - void OutputModuleBase::fillDescription(ParameterSetDescription& desc) { - ProductSelectorRules::fillDescription(desc, "outputCommands"); - EventSelector::fillDescription(desc); - } - - void OutputModuleBase::prevalidate(ConfigurationDescriptions&) {} - - static const std::string kBaseType("OutputModule"); - const std::string& OutputModuleBase::baseType() { return kBaseType; } - - void OutputModuleBase::setEventSelectionInfo( - std::map > > const& outputModulePathPositions, - bool anyProductProduced) { - selector_config_id_ = detail::registerProperSelectionInfo(getParameterSet(selector_config_id_), - description().moduleLabel(), - outputModulePathPositions, - anyProductProduced); - } } // namespace global } // namespace edm diff --git a/FWCore/Framework/src/limited/OutputModuleBase.cc b/FWCore/Framework/src/limited/OutputModuleBase.cc index 0a5156070efc5..f2ae7a04e4d60 100644 --- a/FWCore/Framework/src/limited/OutputModuleBase.cc +++ b/FWCore/Framework/src/limited/OutputModuleBase.cc @@ -13,255 +13,30 @@ // user include files #include "FWCore/Framework/interface/limited/OutputModuleBase.h" - -#include "DataFormats/Common/interface/Handle.h" -#include "DataFormats/Common/interface/ThinnedAssociation.h" -#include "DataFormats/Provenance/interface/BranchDescription.h" -#include "DataFormats/Provenance/interface/BranchKey.h" -#include "DataFormats/Provenance/interface/ProductRegistry.h" -#include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h" -#include "FWCore/Framework/interface/EventForOutput.h" -#include "FWCore/Framework/interface/EventPrincipal.h" -#include "FWCore/Framework/src/insertSelectedProcesses.h" -#include "FWCore/Framework/interface/LuminosityBlockForOutput.h" -#include "FWCore/Framework/interface/ProcessBlockForOutput.h" -#include "FWCore/Framework/interface/RunForOutput.h" -#include "FWCore/Framework/src/OutputModuleDescription.h" -#include "FWCore/Framework/interface/TriggerNamesService.h" -#include "FWCore/Framework/src/EventSignalsSentry.h" #include "FWCore/Framework/interface/PreallocationConfiguration.h" -#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" -#include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" -#include "FWCore/ServiceRegistry/interface/Service.h" -#include "FWCore/Utilities/interface/DebugMacros.h" -#include "FWCore/Reflection/interface/DictionaryTools.h" namespace edm { namespace limited { // ------------------------------------------------------- OutputModuleBase::OutputModuleBase(ParameterSet const& pset) - : maxEvents_(-1), - remainingEvents_(maxEvents_), - keptProducts_(), - hasNewlyDroppedBranch_(), - process_name_(), - productSelectorRules_(pset, "outputCommands", "OutputModule"), - productSelector_(), - moduleDescription_(), - wantAllEvents_(false), - selectors_(), - selector_config_id_(), - droppedBranchIDToKeptBranchID_(), - branchIDLists_(new BranchIDLists), - origBranchIDLists_(nullptr), - thinnedAssociationsHelper_(new ThinnedAssociationsHelper), - queue_(pset.getUntrackedParameter("concurrencyLimit")) { - hasNewlyDroppedBranch_.fill(false); - - Service tns; - process_name_ = tns->getProcessName(); - - selectEvents_ = pset.getUntrackedParameterSet("SelectEvents", ParameterSet()); - - selectEvents_.registerIt(); // Just in case this PSet is not registered - - selector_config_id_ = selectEvents_.id(); - - //need to set wantAllEvents_ in constructor - // we will make the remaining selectors once we know how many streams - selectors_.resize(1); - wantAllEvents_ = detail::configureEventSelector( - selectEvents_, process_name_, getAllTriggerNames(), selectors_[0], consumesCollector()); - } - - void OutputModuleBase::configure(OutputModuleDescription const& desc) { - remainingEvents_ = maxEvents_ = desc.maxEvents_; - origBranchIDLists_ = desc.branchIDLists_; - } - - void OutputModuleBase::selectProducts(ProductRegistry const& preg, - ThinnedAssociationsHelper const& thinnedAssociationsHelper, - ProcessBlockHelperBase const& processBlockHelper) { - if (productSelector_.initialized()) - return; - productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions()); - - // TODO: See if we can collapse keptProducts_ and productSelector_ into a - // single object. See the notes in the header for ProductSelector - // for more information. - - std::map trueBranchIDToKeptBranchDesc; - std::vector associationDescriptions; - std::set keptProductsInEvent; - std::set processesWithSelectedMergeableRunProducts; - std::set processesWithKeptProcessBlockProducts; - - for (auto const& it : preg.productList()) { - BranchDescription const& desc = it.second; - if (desc.transient()) { - // if the class of the branch is marked transient, output nothing - } else if (!desc.present() && !desc.produced()) { - // else if the branch containing the product has been previously dropped, - // output nothing - } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) { - associationDescriptions.push_back(&desc); - } else if (selected(desc)) { - keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent); - insertSelectedProcesses( - desc, processesWithSelectedMergeableRunProducts, processesWithKeptProcessBlockProducts); - } else { - // otherwise, output nothing, - // and mark the fact that there is a newly dropped branch of this type. - hasNewlyDroppedBranch_[desc.branchType()] = true; - } - } - - setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts); - - thinnedAssociationsHelper.selectAssociationProducts( - associationDescriptions, keptProductsInEvent, keepAssociation_); - - for (auto association : associationDescriptions) { - if (keepAssociation_[association->branchID()]) { - keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent); - } else { - hasNewlyDroppedBranch_[association->branchType()] = true; - } - } - - // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept. - ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_); - - thinnedAssociationsHelper_->updateFromParentProcess( - thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_); - outputProcessBlockHelper_.updateAfterProductSelection(processesWithKeptProcessBlockProducts, processBlockHelper); - } - - void OutputModuleBase::updateBranchIDListsWithKeptAliases() { - if (!droppedBranchIDToKeptBranchID_.empty()) { - // Make a private copy of the BranchIDLists. - *branchIDLists_ = *origBranchIDLists_; - // Check for branches dropped while an EDAlias was kept. - for (BranchIDList& branchIDList : *branchIDLists_) { - for (BranchID::value_type& branchID : branchIDList) { - // Replace BranchID of each dropped branch with that of the kept - // alias, so the alias branch will have the product ID of the original branch. - std::map::const_iterator iter = - droppedBranchIDToKeptBranchID_.find(branchID); - if (iter != droppedBranchIDToKeptBranchID_.end()) { - branchID = iter->second; - } - } - } - } - } - - void OutputModuleBase::keepThisBranch(BranchDescription const& desc, - std::map& trueBranchIDToKeptBranchDesc, - std::set& keptProductsInEvent) { - ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc); - - EDGetToken token; - - std::vector missingDictionaries; - if (!checkDictionary(missingDictionaries, desc.className(), desc.unwrappedType())) { - std::string context("Calling OutputModuleBase::keepThisBranch, checking dictionaries for kept types"); - throwMissingDictionariesException(missingDictionaries, context); - } - - switch (desc.branchType()) { - case InEvent: { - if (desc.produced()) { - keptProductsInEvent.insert(desc.originalBranchID()); - } else { - keptProductsInEvent.insert(desc.branchID()); - } - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()}); - break; - } - case InLumi: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - case InRun: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - case InProcess: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - default: - assert(false); - break; - } - // Now put it in the list of selected branches. - keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token)); - } - - OutputModuleBase::~OutputModuleBase() {} + : core::OutputModuleCore(pset), queue_(pset.getUntrackedParameter("concurrencyLimit")) {} void OutputModuleBase::doPreallocate(PreallocationConfiguration const& iPC) { auto nstreams = iPC.numberOfStreams(); - selectors_.resize(nstreams); - bool seenFirst = false; - for (auto& s : selectors_) { - if (seenFirst) { - detail::configureEventSelector(selectEvents_, process_name_, getAllTriggerNames(), s, consumesCollector()); - } else { - seenFirst = true; - } - } preallocStreams(nstreams); - preallocLumis(iPC.numberOfLuminosityBlocks()); + core::OutputModuleCore::doPreallocate_(iPC); preallocate(iPC); } - void OutputModuleBase::doBeginJob() { this->beginJob(); } - - void OutputModuleBase::doEndJob() { endJob(); } - - bool OutputModuleBase::needToRunSelection() const { return !wantAllEvents_; } - - std::vector OutputModuleBase::productsUsedBySelection() const { - std::vector returnValue; - auto const& s = selectors_[0]; - auto const n = s.numberOfTokens(); - returnValue.reserve(n); - - for (unsigned int i = 0; i < n; ++i) { - returnValue.emplace_back(uncheckedIndexFrom(s.token(i))); - } - return returnValue; - } - - bool OutputModuleBase::prePrefetchSelection(StreamID id, - EventPrincipal const& ep, - ModuleCallingContext const* mcc) { - if (wantAllEvents_) - return true; - auto& s = selectors_[id.value()]; - EventForOutput e(ep, moduleDescription_, mcc); - e.setConsumer(this); - return s.wantEvent(e); - } + void OutputModuleBase::doBeginJob() { core::OutputModuleCore::doBeginJob_(); } bool OutputModuleBase::doEvent(EventTransitionInfo const& info, ActivityRegistry* act, ModuleCallingContext const* mcc) { - { - EventForOutput e(info, moduleDescription_, mcc); - e.setConsumer(this); - EventSignalsSentry sentry(act, mcc); - write(e); - } + { core::OutputModuleCore::doEvent_(info, act, mcc); } auto remainingEvents = remainingEvents_.load(); bool keepTrying = remainingEvents > 0; @@ -277,111 +52,10 @@ namespace edm { return true; } - bool OutputModuleBase::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { - RunForOutput r(info, moduleDescription_, mcc, false); - r.setConsumer(this); - doBeginRun_(r); - return true; - } - - bool OutputModuleBase::doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { - RunForOutput r(info, moduleDescription_, mcc, true); - r.setConsumer(this); - doEndRun_(r); - return true; - } - - void OutputModuleBase::doWriteProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) { - ProcessBlockForOutput pb(pbp, moduleDescription_, mcc, true); - pb.setConsumer(this); - writeProcessBlock(pb); - } - - void OutputModuleBase::doWriteRun(RunPrincipal const& rp, - ModuleCallingContext const* mcc, - MergeableRunProductMetadata const* mrpm) { - RunForOutput r(rp, moduleDescription_, mcc, true, mrpm); - r.setConsumer(this); - writeRun(r); - } - - bool OutputModuleBase::doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(info, moduleDescription_, mcc, false); - lb.setConsumer(this); - doBeginLuminosityBlock_(lb); - return true; - } - - bool OutputModuleBase::doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(info, moduleDescription_, mcc, true); - lb.setConsumer(this); - doEndLuminosityBlock_(lb); - return true; - } - - void OutputModuleBase::doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, - ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true); - lb.setConsumer(this); - writeLuminosityBlock(lb); - } - - void OutputModuleBase::doOpenFile(FileBlock const& fb) { openFile(fb); } - - void OutputModuleBase::doRespondToOpenInputFile(FileBlock const& fb) { - updateBranchIDListsWithKeptAliases(); - doRespondToOpenInputFile_(fb); - } - - void OutputModuleBase::doRespondToCloseInputFile(FileBlock const& fb) { doRespondToCloseInputFile_(fb); } - - void OutputModuleBase::doCloseFile() { - if (isFileOpen()) { - reallyCloseFile(); - } - } - - void OutputModuleBase::reallyCloseFile() {} - - BranchIDLists const* OutputModuleBase::branchIDLists() const { - if (!droppedBranchIDToKeptBranchID_.empty()) { - return branchIDLists_.get(); - } - return origBranchIDLists_; - } - - ThinnedAssociationsHelper const* OutputModuleBase::thinnedAssociationsHelper() const { - return thinnedAssociationsHelper_.get(); - } - - ModuleDescription const& OutputModuleBase::description() const { return moduleDescription_; } - - bool OutputModuleBase::selected(BranchDescription const& desc) const { return productSelector_.selected(desc); } - - void OutputModuleBase::fillDescriptions(ConfigurationDescriptions& descriptions) { - ParameterSetDescription desc; - desc.setUnknown(); - descriptions.addDefault(desc); - } - - void OutputModuleBase::fillDescription(ParameterSetDescription& desc) { - ProductSelectorRules::fillDescription(desc, "outputCommands"); - EventSelector::fillDescription(desc); + void OutputModuleBase::fillDescription(ParameterSetDescription& desc, + std::vector const& defaultOutputCommands) { + core::OutputModuleCore::fillDescription(desc, defaultOutputCommands); desc.addUntracked("concurrencyLimit", 1); } - - void OutputModuleBase::prevalidate(ConfigurationDescriptions&) {} - - static const std::string kBaseType("OutputModule"); - const std::string& OutputModuleBase::baseType() { return kBaseType; } - - void OutputModuleBase::setEventSelectionInfo( - std::map > > const& outputModulePathPositions, - bool anyProductProduced) { - selector_config_id_ = detail::registerProperSelectionInfo(getParameterSet(selector_config_id_), - description().moduleLabel(), - outputModulePathPositions, - anyProductProduced); - } } // namespace limited } // namespace edm diff --git a/FWCore/Framework/src/one/OutputModuleBase.cc b/FWCore/Framework/src/one/OutputModuleBase.cc index 9a85555ff6b32..2105a0456ae30 100644 --- a/FWCore/Framework/src/one/OutputModuleBase.cc +++ b/FWCore/Framework/src/one/OutputModuleBase.cc @@ -18,6 +18,7 @@ #include "DataFormats/Common/interface/Handle.h" #include "DataFormats/Common/interface/ThinnedAssociation.h" +#include "DataFormats/Common/interface/EndPathStatus.h" #include "DataFormats/Provenance/interface/BranchDescription.h" #include "DataFormats/Provenance/interface/BranchKey.h" #include "DataFormats/Provenance/interface/ProductRegistry.h" @@ -43,339 +44,35 @@ namespace edm { namespace one { // ------------------------------------------------------- - OutputModuleBase::OutputModuleBase(ParameterSet const& pset) - : maxEvents_(-1), - remainingEvents_(maxEvents_), - keptProducts_(), - hasNewlyDroppedBranch_(), - process_name_(), - productSelectorRules_(pset, "outputCommands", "OutputModule"), - productSelector_(), - moduleDescription_(), - wantAllEvents_(false), - selectors_(), - selector_config_id_(), - droppedBranchIDToKeptBranchID_(), - branchIDLists_(new BranchIDLists), - origBranchIDLists_(nullptr), - thinnedAssociationsHelper_(new ThinnedAssociationsHelper) { - hasNewlyDroppedBranch_.fill(false); - - Service tns; - process_name_ = tns->getProcessName(); - - selectEvents_ = pset.getUntrackedParameterSet("SelectEvents", ParameterSet()); - - selectEvents_.registerIt(); // Just in case this PSet is not registered - - selector_config_id_ = selectEvents_.id(); - - //need to set wantAllEvents_ in constructor - // we will make the remaining selectors once we know how many streams - selectors_.resize(1); - wantAllEvents_ = detail::configureEventSelector( - selectEvents_, process_name_, getAllTriggerNames(), selectors_[0], consumesCollector()); - } + OutputModuleBase::OutputModuleBase(ParameterSet const& pset) : core::OutputModuleCore(pset) {} void OutputModuleBase::configure(OutputModuleDescription const& desc) { - remainingEvents_ = maxEvents_ = desc.maxEvents_; - origBranchIDLists_ = desc.branchIDLists_; + core::OutputModuleCore::configure(desc); subProcessParentageHelper_ = desc.subProcessParentageHelper_; } - void OutputModuleBase::selectProducts(ProductRegistry const& preg, - ThinnedAssociationsHelper const& thinnedAssociationsHelper, - ProcessBlockHelperBase const& processBlockHelper) { - if (productSelector_.initialized()) - return; - productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions()); - - // TODO: See if we can collapse keptProducts_ and productSelector_ into a - // single object. See the notes in the header for ProductSelector - // for more information. - - std::map trueBranchIDToKeptBranchDesc; - std::vector associationDescriptions; - std::set keptProductsInEvent; - std::set processesWithSelectedMergeableRunProducts; - std::set processesWithKeptProcessBlockProducts; - - for (auto const& it : preg.productList()) { - BranchDescription const& desc = it.second; - if (desc.transient()) { - // if the class of the branch is marked transient, output nothing - } else if (!desc.present() && !desc.produced()) { - // else if the branch containing the product has been previously dropped, - // output nothing - } else if (desc.unwrappedType() == typeid(ThinnedAssociation)) { - associationDescriptions.push_back(&desc); - } else if (selected(desc)) { - keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent); - insertSelectedProcesses( - desc, processesWithSelectedMergeableRunProducts, processesWithKeptProcessBlockProducts); - } else { - // otherwise, output nothing, - // and mark the fact that there is a newly dropped branch of this type. - hasNewlyDroppedBranch_[desc.branchType()] = true; - } - } - - setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts); - - thinnedAssociationsHelper.selectAssociationProducts( - associationDescriptions, keptProductsInEvent, keepAssociation_); - - for (auto association : associationDescriptions) { - if (keepAssociation_[association->branchID()]) { - keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent); - } else { - hasNewlyDroppedBranch_[association->branchType()] = true; - } - } - - // Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept. - ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_); - - thinnedAssociationsHelper_->updateFromParentProcess( - thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_); - outputProcessBlockHelper_.updateAfterProductSelection(processesWithKeptProcessBlockProducts, processBlockHelper); - } - - void OutputModuleBase::keepThisBranch(BranchDescription const& desc, - std::map& trueBranchIDToKeptBranchDesc, - std::set& keptProductsInEvent) { - ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc); - - EDGetToken token; - - std::vector missingDictionaries; - if (!checkDictionary(missingDictionaries, desc.className(), desc.unwrappedType())) { - std::string context("Calling OutputModuleBase::keepThisBranch, checking dictionaries for kept types"); - throwMissingDictionariesException(missingDictionaries, context); - } - - switch (desc.branchType()) { - case InEvent: { - if (desc.produced()) { - keptProductsInEvent.insert(desc.originalBranchID()); - } else { - keptProductsInEvent.insert(desc.branchID()); - } - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()}); - break; - } - case InLumi: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - case InRun: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - case InProcess: { - token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE}, - InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName())); - break; - } - default: - assert(false); - break; - } - // Now put it in the list of selected branches. - keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token)); - } - - OutputModuleBase::~OutputModuleBase() {} - SharedResourcesAcquirer OutputModuleBase::createAcquirer() { return SharedResourcesAcquirer{ std::vector>(1, std::make_shared())}; } void OutputModuleBase::doPreallocate(PreallocationConfiguration const& iPC) { - auto nstreams = iPC.numberOfStreams(); - selectors_.resize(nstreams); - - preallocLumis(iPC.numberOfLuminosityBlocks()); - - bool seenFirst = false; - for (auto& s : selectors_) { - if (seenFirst) { - detail::configureEventSelector(selectEvents_, process_name_, getAllTriggerNames(), s, consumesCollector()); - } else { - seenFirst = true; - } - } + core::OutputModuleCore::doPreallocate_(iPC); } - void OutputModuleBase::preallocLumis(unsigned int) {} - void OutputModuleBase::doBeginJob() { resourcesAcquirer_ = createAcquirer(); - this->beginJob(); - } - - void OutputModuleBase::doEndJob() { endJob(); } - - bool OutputModuleBase::needToRunSelection() const { return !wantAllEvents_; } - - std::vector OutputModuleBase::productsUsedBySelection() const { - std::vector returnValue; - auto const& s = selectors_[0]; - auto const n = s.numberOfTokens(); - returnValue.reserve(n); - - for (unsigned int i = 0; i < n; ++i) { - returnValue.emplace_back(uncheckedIndexFrom(s.token(i))); - } - return returnValue; - } - - bool OutputModuleBase::prePrefetchSelection(StreamID id, - EventPrincipal const& ep, - ModuleCallingContext const* mcc) { - if (wantAllEvents_) - return true; - auto& s = selectors_[id.value()]; - EventForOutput e(ep, moduleDescription_, mcc); - e.setConsumer(this); - return s.wantEvent(e); + core::OutputModuleCore::doBeginJob_(); } bool OutputModuleBase::doEvent(EventTransitionInfo const& info, ActivityRegistry* act, ModuleCallingContext const* mcc) { - { - EventForOutput e(info, moduleDescription_, mcc); - e.setConsumer(this); - EventSignalsSentry sentry(act, mcc); - write(e); - } + { core::OutputModuleCore::doEvent_(info, act, mcc); } if (remainingEvents_ > 0) { --remainingEvents_; } return true; } - - bool OutputModuleBase::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { - RunForOutput r(info, moduleDescription_, mcc, false); - r.setConsumer(this); - doBeginRun_(r); - return true; - } - - bool OutputModuleBase::doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) { - RunForOutput r(info, moduleDescription_, mcc, true); - r.setConsumer(this); - doEndRun_(r); - return true; - } - - void OutputModuleBase::doWriteProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) { - ProcessBlockForOutput pb(pbp, moduleDescription_, mcc, true); - pb.setConsumer(this); - writeProcessBlock(pb); - } - - void OutputModuleBase::doWriteRun(RunPrincipal const& rp, - ModuleCallingContext const* mcc, - MergeableRunProductMetadata const* mrpm) { - RunForOutput r(rp, moduleDescription_, mcc, true, mrpm); - r.setConsumer(this); - writeRun(r); - } - - bool OutputModuleBase::doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(info, moduleDescription_, mcc, false); - lb.setConsumer(this); - doBeginLuminosityBlock_(lb); - return true; - } - - bool OutputModuleBase::doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(info, moduleDescription_, mcc, true); - lb.setConsumer(this); - doEndLuminosityBlock_(lb); - - return true; - } - - void OutputModuleBase::doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, - ModuleCallingContext const* mcc) { - LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true); - lb.setConsumer(this); - writeLuminosityBlock(lb); - } - - void OutputModuleBase::doOpenFile(FileBlock const& fb) { openFile(fb); } - - void OutputModuleBase::doRespondToOpenInputFile(FileBlock const& fb) { doRespondToOpenInputFile_(fb); } - - void OutputModuleBase::doRespondToCloseInputFile(FileBlock const& fb) { doRespondToCloseInputFile_(fb); } - - void OutputModuleBase::doCloseFile() { - if (isFileOpen()) { - reallyCloseFile(); - } - } - - void OutputModuleBase::reallyCloseFile() {} - - BranchIDLists const* OutputModuleBase::branchIDLists() { - if (!droppedBranchIDToKeptBranchID_.empty()) { - // Make a private copy of the BranchIDLists. - *branchIDLists_ = *origBranchIDLists_; - // Check for branches dropped while an EDAlias was kept. - for (BranchIDList& branchIDList : *branchIDLists_) { - for (BranchID::value_type& branchID : branchIDList) { - // Replace BranchID of each dropped branch with that of the kept alias, so the alias branch will have the product ID of the original branch. - std::map::const_iterator iter = - droppedBranchIDToKeptBranchID_.find(branchID); - if (iter != droppedBranchIDToKeptBranchID_.end()) { - branchID = iter->second; - } - } - } - return branchIDLists_.get(); - } - return origBranchIDLists_; - } - - ThinnedAssociationsHelper const* OutputModuleBase::thinnedAssociationsHelper() const { - return thinnedAssociationsHelper_.get(); - } - - ModuleDescription const& OutputModuleBase::description() const { return moduleDescription_; } - - bool OutputModuleBase::selected(BranchDescription const& desc) const { return productSelector_.selected(desc); } - - void OutputModuleBase::fillDescriptions(ConfigurationDescriptions& descriptions) { - ParameterSetDescription desc; - desc.setUnknown(); - descriptions.addDefault(desc); - } - - void OutputModuleBase::fillDescription(ParameterSetDescription& desc, - std::vector const& defaultOutputCommands) { - ProductSelectorRules::fillDescription(desc, "outputCommands", defaultOutputCommands); - EventSelector::fillDescription(desc); - } - - void OutputModuleBase::prevalidate(ConfigurationDescriptions&) {} - - static const std::string kBaseType("OutputModule"); - const std::string& OutputModuleBase::baseType() { return kBaseType; } - - void OutputModuleBase::setEventSelectionInfo( - std::map>> const& outputModulePathPositions, - bool anyProductProduced) { - selector_config_id_ = detail::registerProperSelectionInfo(getParameterSet(selector_config_id_), - description().moduleLabel(), - outputModulePathPositions, - anyProductProduced); - } } // namespace one } // namespace edm diff --git a/FWCore/Integration/test/BuildFile.xml b/FWCore/Integration/test/BuildFile.xml index b1082508dbdda..8c0ab8259f948 100644 --- a/FWCore/Integration/test/BuildFile.xml +++ b/FWCore/Integration/test/BuildFile.xml @@ -529,5 +529,6 @@ + diff --git a/FWCore/Integration/test/test_finalpath.sh b/FWCore/Integration/test/test_finalpath.sh new file mode 100755 index 0000000000000..5383fb4757134 --- /dev/null +++ b/FWCore/Integration/test/test_finalpath.sh @@ -0,0 +1,79 @@ +#!/bin/bash + +LOCAL_TEST_DIR=${CMSSW_BASE}/src/FWCore/Integration/test +LOCAL_TMP_DIR=${CMSSW_BASE}/tmp/${SCRAM_ARCH} + +# Pass in name and status +function die { echo $1: status $2 ; echo === Log file === ; cat ${3:-/dev/null} ; echo === End log file === ; exit $2; } + +pushd ${LOCAL_TMP_DIR} + +cat < finalpath_expected_empty.log +EOF + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py >& finalpath.log || die "failed test_finalpath_cfg.py" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py" $? + + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --schedule >& finalpath.log || die "failed test_finalpath_cfg.py --schedule" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py" $? + + +cat < finalpath_expected_not_found.log +did not find thing '' TEST +did not find thing '' TEST +did not find thing '' TEST +found thing 'beginLumi' TEST +found thing 'endLumi' TEST +found thing 'beginRun' TEST +found thing 'endRun' TEST +EOF +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --schedule --task >& finalpath.log || die "failed test_finalpath_cfg.py --schedule --task" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_not_found.log - || die "differences for test_finalpath_cfg.py --schedule --task" $? + + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath >& finalpath.log || die "failed test_finalpath_cfg.py --endpath" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py --endpath" $? + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --schedule --endpath >& finalpath.log || die "failed test_finalpath_cfg.py --schedule --endpath" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py --schedule --endpath" $? + + +cat < finalpath_expected_found.log +found thing '' TEST +found thing '' TEST +found thing '' TEST +found thing 'beginLumi' TEST +found thing 'endLumi' TEST +found thing 'beginRun' TEST +found thing 'endRun' TEST +EOF +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath --task >& finalpath.log || die "failed test_finalpath_cfg.py --endpath --task" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --endpath --task" $? + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath --task --schedule >& finalpath.log || die "failed test_finalpath_cfg.py --endpath --task --schedule" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --endpath --task --schedule" $? + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --task >& finalpath.log || die "failed test_finalpath_cfg.py --path --task" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --path --task" $? + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --task --schedule >& finalpath.log || die "failed test_finalpath_cfg.py --path --task --schedule" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --path --task --schedule" $? + + +cat < finalpath_expected_filter.log +did not find thing '' TEST +found thing '' TEST +did not find thing '' TEST +found thing 'beginLumi' TEST +found thing 'endLumi' TEST +found thing 'beginRun' TEST +found thing 'endRun' TEST +EOF + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --filter >& finalpath.log || die "failed test_finalpath_cfg.py --path --filter" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_filter.log - || die "differences for test_finalpath_cfg.py --path --filter" $? + + +cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --filter --task >& finalpath.log || die "failed test_finalpath_cfg.py --path --filter --task" $? +grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_filter.log - || die "differences for test_finalpath_cfg.py --path --filter --task" $? diff --git a/FWCore/Integration/test/test_finalpath_cfg.py b/FWCore/Integration/test/test_finalpath_cfg.py new file mode 100644 index 0000000000000..d427dc9f5a0b9 --- /dev/null +++ b/FWCore/Integration/test/test_finalpath_cfg.py @@ -0,0 +1,69 @@ +import FWCore.ParameterSet.Config as cms +import argparse +import sys + +parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test FinalPath.') + +parser.add_argument("--schedule", help="use cms.Schedule", action="store_true") +parser.add_argument("--task", help="put EDProducer into a task", action="store_true") +parser.add_argument("--path", help="put a consumer of the product onto a Path", action="store_true") +parser.add_argument("--endpath", help="put a consumer of the product onto an EndPath", action="store_true") +parser.add_argument("--filter", action="store_true") +parser.add_argument("--tracer", help="add Tracer service", action="store_true") + +print(sys.argv) +argv = sys.argv[:] +if '--' in argv: + argv.remove("--") +args, unknown = parser.parse_known_args(argv) + + +process = cms.Process("TEST") + +process.MessageLogger.cerr.INFO.limit = 10000 + +process.source = cms.Source("EmptySource") + +process.maxEvents.input = 3 + +process.thing = cms.EDProducer("ThingProducer") + +scheduledPaths =[] +if args.path: + print("adding Path") + process.otherThing = cms.EDProducer("OtherThingProducer", thingTag = cms.InputTag("thing")) + p = cms.Path() + if args.filter: + process.fltr = cms.EDFilter("Prescaler", prescaleFactor = cms.int32(2), prescaleOffset=cms.int32(0)) + p += process.fltr + if not args.task: + p += process.thing + p += process.otherThing + process.p = p + scheduledPaths.append(process.p) + if args.task: + process.p.associate(cms.Task(process.thing)) + +if args.endpath: + print("adding EndPath") + process.out2 = cms.OutputModule("AsciiOutputModule",outputCommands = cms.untracked.vstring("drop *", "keep *_thing_*_*")) + process.o = cms.EndPath(process.out2) + scheduledPaths.append(process.o) + if args.task: + process.o.associate(cms.Task(process.thing)) + +process.out = cms.OutputModule("GetProductCheckerOutputModule", verbose= cms.untracked.bool(True), outputCommands = cms.untracked.vstring("drop *", "keep *_thing_*_*")) +process.f = cms.FinalPath(process.out) + +if args.schedule: + print("adding Schedule") + scheduledPaths.append(process.f) + process.schedule = cms.Schedule(*scheduledPaths) + if args.task: + process.schedule.associate(cms.Task(process.thing)) + +if args.tracer: + process.add_(cms.Service("Tracer")) + +process.options.numberOfThreads=3 +process.options.numberOfStreams=1 diff --git a/FWCore/Modules/src/GetProductCheckerOutputModule.cc b/FWCore/Modules/src/GetProductCheckerOutputModule.cc index 6338fd0808015..d010b2bfcc83d 100644 --- a/FWCore/Modules/src/GetProductCheckerOutputModule.cc +++ b/FWCore/Modules/src/GetProductCheckerOutputModule.cc @@ -24,6 +24,7 @@ #include "FWCore/Utilities/interface/ProductKindOfType.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" namespace edm { class ModuleCallingContext; @@ -40,6 +41,7 @@ namespace edm { void write(EventForOutput const& e) override; void writeLuminosityBlock(LuminosityBlockForOutput const&) override; void writeRun(RunForOutput const&) override; + const bool verbose_; }; // @@ -54,7 +56,9 @@ namespace edm { // constructors and destructor // GetProductCheckerOutputModule::GetProductCheckerOutputModule(ParameterSet const& iPSet) - : one::OutputModuleBase(iPSet), one::OutputModule<>(iPSet) {} + : one::OutputModuleBase(iPSet), + one::OutputModule<>(iPSet), + verbose_(iPSet.getUntrackedParameter("verbose")) {} // GetProductCheckerOutputModule::GetProductCheckerOutputModule(GetProductCheckerOutputModule const& rhs) { // // do actual copying here; @@ -77,12 +81,23 @@ namespace edm { // member functions // template - static void check(T const& p, std::string const& id, SelectedProducts const& iProducts) { + static void check(T const& p, std::string const& id, SelectedProducts const& iProducts, bool iVerbose) { for (auto const& product : iProducts) { BranchDescription const* branchDescription = product.first; TypeID const& tid = branchDescription->unwrappedTypeID(); EDGetToken const& token = product.second; BasicHandle bh = p.getByToken(token, tid); + if (iVerbose) { + if (bh.isValid()) { + edm::LogInfo("FoundProduct") << "found " << branchDescription->moduleLabel() << " '" + << branchDescription->productInstanceName() << "' " + << branchDescription->processName(); + } else { + edm::LogInfo("DidNotFindProduct") + << "did not find " << branchDescription->moduleLabel() << " '" << branchDescription->productInstanceName() + << "' " << branchDescription->processName(); + } + } if (nullptr != bh.provenance() && bh.provenance()->branchDescription().branchID() != branchDescription->branchID()) { throw cms::Exception("BranchIDMissMatch") @@ -96,17 +111,17 @@ namespace edm { void GetProductCheckerOutputModule::write(EventForOutput const& e) { std::ostringstream str; str << e.id(); - check(e, str.str(), keptProducts()[InEvent]); + check(e, str.str(), keptProducts()[InEvent], verbose_); } void GetProductCheckerOutputModule::writeLuminosityBlock(LuminosityBlockForOutput const& l) { std::ostringstream str; str << l.id(); - check(l, str.str(), keptProducts()[InLumi]); + check(l, str.str(), keptProducts()[InLumi], verbose_); } void GetProductCheckerOutputModule::writeRun(RunForOutput const& r) { std::ostringstream str; str << r.id(); - check(r, str.str(), keptProducts()[InRun]); + check(r, str.str(), keptProducts()[InRun], verbose_); } // @@ -120,6 +135,7 @@ namespace edm { void GetProductCheckerOutputModule::fillDescriptions(ConfigurationDescriptions& descriptions) { ParameterSetDescription desc; one::OutputModule<>::fillDescription(desc); + desc.addUntracked("verbose", false); descriptions.add("productChecker", desc); } } // namespace edm diff --git a/FWCore/ParameterSet/python/Config.py b/FWCore/ParameterSet/python/Config.py index 9a4f3ad5d3a6a..4e21934db6d8b 100644 --- a/FWCore/ParameterSet/python/Config.py +++ b/FWCore/ParameterSet/python/Config.py @@ -17,7 +17,7 @@ from .Modules import _Module from .SequenceTypes import * from .SequenceTypes import _ModuleSequenceType, _Sequenceable #extend needs it -from .SequenceVisitors import PathValidator, EndPathValidator, ScheduleTaskValidator, NodeVisitor, CompositeVisitor, ModuleNamesFromGlobalsVisitor +from .SequenceVisitors import PathValidator, EndPathValidator, FinalPathValidator, ScheduleTaskValidator, NodeVisitor, CompositeVisitor, ModuleNamesFromGlobalsVisitor from .MessageLogger import MessageLogger from . import DictTypes @@ -120,6 +120,7 @@ def __init__(self,name,*Mods): self.__dict__['_Process__outputmodules'] = {} self.__dict__['_Process__paths'] = DictTypes.SortedKeysDict() # have to keep the order self.__dict__['_Process__endpaths'] = DictTypes.SortedKeysDict() # of definition + self.__dict__['_Process__finalpaths'] = DictTypes.SortedKeysDict() # of definition self.__dict__['_Process__sequences'] = {} self.__dict__['_Process__tasks'] = {} self.__dict__['_Process__services'] = {} @@ -289,6 +290,10 @@ def endpaths_(self): """returns a dict of the endpaths that have been added to the Process""" return DictTypes.SortedAndFixedKeysDict(self.__endpaths) endpaths = property(endpaths_,doc="dictionary containing the endpaths for the process") + def finalpaths_(self): + """returns a dict of the finalpaths that have been added to the Process""" + return DictTypes.SortedAndFixedKeysDict(self.__finalpaths) + finalpaths = property(finalpaths_,doc="dictionary containing the finalpaths for the process") def sequences_(self): """returns a dict of the sequences that have been added to the Process""" return DictTypes.FixedKeysDict(self.__sequences) @@ -477,6 +482,9 @@ def __setattr__(self,name,value): s = self.__findFirstUsingModule(self.endpaths,oldValue) if s is not None: raise ValueError(msg1+"endpath "+s.label_()+msg2) + s = self.__findFirstUsingModule(self.finalpaths,oldValue) + if s is not None: + raise ValueError(msg1+"finalpath "+s.label_()+msg2) # In case of EDAlias, raise Exception always to avoid surprises if isinstance(newValue, EDAlias): @@ -503,6 +511,9 @@ def __setattr__(self,name,value): s = self.__findFirstUsingModule(self.endpaths,oldValue) if s is not None: raise ValueError(msg1+"endpath "+s.label_()+msg2) + s = self.__findFirstUsingModule(self.finalpaths,oldValue) + if s is not None: + raise ValueError(msg1+"finalpath "+s.label_()+msg2) if not self.__InExtendCall and (Schedule._itemIsValid(newValue) or isinstance(newValue, Task)): self._replaceInScheduleDirectly(name, newValue) @@ -649,6 +660,13 @@ def _placeEndPath(self,name,mod): except ModuleCloneError as msg: context = format_outerframe(4) raise Exception("%sThe module %s in endpath %s is unknown to the process %s." %(context, msg, name, self._Process__name)) + def _placeFinalPath(self,name,mod): + self._validateSequence(mod, name) + try: + self._place(name, mod, self.__finalpaths) + except ModuleCloneError as msg: + context = format_outerframe(4) + raise Exception("%sThe module %s in finalpath %s is unknown to the process %s." %(context, msg, name, self._Process__name)) def _placeSequence(self,name,mod): self._validateSequence(mod, name) self._place(name, mod, self.__sequences) @@ -804,6 +822,9 @@ def dumpConfig(self, options=PrintOptions()): config+=self._dumpConfigNamedList(self.endpaths_().items(), 'endpath', options) + config+=self._dumpConfigNamedList(self.finalpaths_().items(), + 'finalpath', + options) config+=self._dumpConfigUnnamedList(self.services_().items(), 'service', options) @@ -989,6 +1010,7 @@ def dumpPython(self, options=PrintOptions()): result+=self._dumpPythonList(self._itemsInDependencyOrder(self.sequences), options) result+=self._dumpPythonList(self.paths_(), options) result+=self._dumpPythonList(self.endpaths_(), options) + result+=self._dumpPythonList(self.finalpaths_(), options) result+=self._dumpPythonList(self.aliases_(), options) if not self.schedule_() == None: result += 'process.schedule = ' + self.schedule.dumpPython(options) @@ -1035,6 +1057,7 @@ def splitPython(self, options = PrintOptions()): parts.update(self._splitPythonList('sequences', self._itemsInDependencyOrder(self.sequences), options)) parts.update(self._splitPythonList('paths', self.paths_(), options)) parts.update(self._splitPythonList('paths', self.endpaths_(), options)) + parts.update(self._splitPythonList('paths', self.finalpaths_(), options)) parts.update(self._splitPythonList('modules', self.aliases_(), options)) if options.targetDirectory is not None: @@ -1083,6 +1106,8 @@ def _replaceInSequences(self, label, new): sequenceable.replace(old,new) for sequenceable in self.endpaths.values(): sequenceable.replace(old,new) + for sequenceable in self.finalpaths.values(): + sequenceable.replace(old,new) def _replaceInTasks(self, label, new): old = getattr(self,label) for task in self.tasks.values(): @@ -1148,6 +1173,7 @@ def _insertPaths(self, processPSet, nodeVisitor): scheduledPaths = [] triggerPaths = [] endpaths = [] + finalpaths = [] if self.schedule_() == None: # make one from triggerpaths & endpaths for name in self.paths_(): @@ -1156,19 +1182,47 @@ def _insertPaths(self, processPSet, nodeVisitor): for name in self.endpaths_(): scheduledPaths.append(name) endpaths.append(name) + for name in self.finalpaths_(): + finalpaths.append(name) else: for path in self.schedule_(): pathname = path.label_() - scheduledPaths.append(pathname) if pathname in self.endpaths_(): endpaths.append(pathname) + scheduledPaths.append(pathname) + elif pathname in self.finalpaths_(): + finalpaths.append(pathname) else: + scheduledPaths.append(pathname) triggerPaths.append(pathname) for task in self.schedule_()._tasks: task.resolve(self.__dict__) scheduleTaskValidator = ScheduleTaskValidator() task.visit(scheduleTaskValidator) task.visit(nodeVisitor) + # consolidate all final_paths into one EndPath + endPathWithFinalPathModulesName ="@finalPath" + finalPathEndPath = EndPath() + if finalpaths: + endpaths.append(endPathWithFinalPathModulesName) + scheduledPaths.append(endPathWithFinalPathModulesName) + finalpathValidator = FinalPathValidator() + modulesOnFinalPath = [] + for finalpathname in finalpaths: + iFinalPath = self.finalpaths_()[finalpathname] + iFinalPath.resolve(self.__dict__) + finalpathValidator.setLabel(finalpathname) + iFinalPath.visit(finalpathValidator) + if finalpathValidator.filtersOnFinalpaths or finalpathValidator.producersOnFinalpaths: + names = [p.label_ for p in finalpathValidator.filtersOnFinalpaths] + names.extend( [p.label_ for p in finalpathValidator.producersOnFinalpaths]) + raise RuntimeError("FinalPath %s has non OutputModules %s" % (finalpathname, ",".join(names))) + modulesOnFinalPath.extend(iFinalPath.moduleNames()) + for m in modulesOnFinalPath: + mod = getattr(self, m) + setattr(mod, "@onFinalPath", untracked.bool(True)) + finalPathEndPath += mod + processPSet.addVString(True, "@end_paths", endpaths) processPSet.addVString(True, "@paths", scheduledPaths) # trigger_paths are a little different @@ -1190,19 +1244,25 @@ def _insertPaths(self, processPSet, nodeVisitor): iPath.visit(pathCompositeVisitor) iPath.insertInto(processPSet, triggername, decoratedList) for endpathname in endpaths: - iEndPath = self.endpaths_()[endpathname] + if endpathname is not endPathWithFinalPathModulesName: + iEndPath = self.endpaths_()[endpathname] + else: + iEndPath = finalPathEndPath iEndPath.resolve(self.__dict__) endpathValidator.setLabel(endpathname) lister.initialize() iEndPath.visit(endpathCompositeVisitor) iEndPath.insertInto(processPSet, endpathname, decoratedList) processPSet.addVString(False, "@filters_on_endpaths", endpathValidator.filtersOnEndpaths) + def resolve(self,keepUnresolvedSequencePlaceholders=False): for x in self.paths.values(): x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders) for x in self.endpaths.values(): x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders) + for x in self.finalpaths.values(): + x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders) if not self.schedule_() == None: for task in self.schedule_()._tasks: task.resolve(self.__dict__,keepUnresolvedSequencePlaceholders) @@ -1231,6 +1291,7 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False): schedNames = set(( x.label_() for x in self.schedule_())) names = set(self.paths) names.update(set(self.endpaths)) + names.update(set(self.finalpaths)) unneededPaths = names - schedNames for n in unneededPaths: delattr(self,n) @@ -1241,6 +1302,7 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False): else: pths = list(self.paths.values()) pths.extend(self.endpaths.values()) + pths.extend(self.finalpaths.values()) temp = Schedule(*pths) usedModules=set(temp.moduleNames()) unneededModules = self._pruneModules(self.producers_(), usedModules) @@ -1256,6 +1318,9 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False): for p in self.endpaths.values(): p.visit(sv) p.visit(tv) + for p in self.finalpaths.values(): + p.visit(sv) + p.visit(tv) def removeUnneeded(seqOrTasks, allSequencesOrTasks): _keepSet = set(( s for s in seqOrTasks if s.hasLabel_())) _availableSet = set(allSequencesOrTasks.values()) @@ -1272,7 +1337,7 @@ def removeUnneeded(seqOrTasks, allSequencesOrTasks): print(" modules:"+",".join(unneededModules)) print(" tasks:"+",".join(unneededTaskLabels)) print(" sequences:"+",".join(unneededSeqLabels)) - print(" paths/endpaths:"+",".join(unneededPaths)) + print(" paths/endpaths/finalpaths:"+",".join(unneededPaths)) def _pruneModules(self, d, scheduledNames): moduleNames = set(d.keys()) junk = moduleNames - scheduledNames @@ -2635,6 +2700,36 @@ def testPath(self): t = Path(p.a, p.t1, Task(), p.t1) self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.Path(process.a, cms.Task(), process.t1)\n') + def testFinalPath(self): + p = Process("test") + p.a = OutputModule("MyOutputModule") + p.b = OutputModule("YourOutputModule") + p.c = OutputModule("OurOutputModule") + path = FinalPath(p.a) + path *= p.b + path += p.c + self.assertEqual(str(path),'a+b+c') + path = FinalPath(p.a*p.b+p.c) + self.assertEqual(str(path),'a+b+c') + path = FinalPath(p.a+ p.b*p.c) + self.assertEqual(str(path),'a+b+c') + path = FinalPath(p.a*(p.b+p.c)) + self.assertEqual(str(path),'a+b+c') + p.es = ESProducer("AnESProducer") + self.assertRaises(TypeError,FinalPath,p.es) + + t = FinalPath() + self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.FinalPath()\n') + + t = FinalPath(p.a) + self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.FinalPath(process.a)\n') + + self.assertRaises(TypeError, FinalPath, Task()) + self.assertRaises(TypeError, FinalPath, p.a, Task()) + + p.prod = EDProducer("prodName") + p.t1 = Task(p.prod) + self.assertRaises(TypeError, FinalPath, p.a, p.t1, Task(), p.t1) def testCloneSequence(self): p = Process("test") a = EDAnalyzer("MyAnalyzer") diff --git a/FWCore/ParameterSet/python/SequenceTypes.py b/FWCore/ParameterSet/python/SequenceTypes.py index 85fbbe26c9af2..2ef9e92cf6b7a 100644 --- a/FWCore/ParameterSet/python/SequenceTypes.py +++ b/FWCore/ParameterSet/python/SequenceTypes.py @@ -651,6 +651,14 @@ def __init__(self,*arg,**argv): def _placeImpl(self,name,proc): proc._placeEndPath(name,self) +class FinalPath(_ModuleSequenceType): + def __init__(self,*arg,**argv): + super(FinalPath,self).__init__(*arg,**argv) + def _placeImpl(self,name,proc): + proc._placeFinalPath(name,self) + def associate(self,task): + raise TypeError("FinalPath does not allow associations with Tasks") + class Sequence(_ModuleSequenceType,_Sequenceable): def __init__(self,*arg,**argv): super(Sequence,self).__init__(*arg,**argv) @@ -746,7 +754,7 @@ def associate(self,*tasks): self._tasks.add(task) @staticmethod def _itemIsValid(item): - return isinstance(item,Path) or isinstance(item,EndPath) + return isinstance(item,Path) or isinstance(item,EndPath) or isinstance(item,FinalPath) def copy(self): import copy aCopy = copy.copy(self) diff --git a/FWCore/ParameterSet/python/SequenceVisitors.py b/FWCore/ParameterSet/python/SequenceVisitors.py index 012c67f1a1c28..933345bc0ef96 100644 --- a/FWCore/ParameterSet/python/SequenceVisitors.py +++ b/FWCore/ParameterSet/python/SequenceVisitors.py @@ -67,6 +67,36 @@ def leave(self,visitee): if isinstance(visitee, Task): self._levelInTasks -= 1 +# Use this on EndPaths +class FinalPathValidator(object): + def __init__(self): + self.__label = '' + self._levelInTasks = 0 + self.filtersOnFinalpaths = [] + self.producersOnFinalpaths = [] + def setLabel(self,label): + self.__label = "'"+label+"' " + def enter(self,visitee): + if visitee.isLeaf(): + if isinstance(visitee, _Labelable): + if not visitee.hasLabel_(): + raise ValueError("FinalPath "+self.__label+"contains a module of type '"+visitee.type_()+"' which has\nno assigned label.") + elif isinstance(visitee, Service): + if not visitee._inProcess: + raise ValueError("FinalPath "+self.__label+"contains a service of type '"+visitee.type_()+"' which is not attached to the process.\n") + if isinstance(visitee, Task): + self._levelInTasks += 1 + if self._levelInTasks > 0: + return + if isinstance(visitee,EDFilter): + self.filtersOnFinalpaths.append(visitee.type_()) + if isinstance(visitee,EDProducer): + self.producersOnFinalpaths.append(visitee.type_()) + def leave(self,visitee): + if self._levelInTasks > 0: + if isinstance(visitee, Task): + self._levelInTasks -= 1 + class NodeVisitor(object): """Form sets of all modules, ESProducers, ESSources and Services in visited objects. Can be used to visit Paths, EndPaths, Sequences or Tasks. Includes in sets objects on sub-Sequences and sub-Tasks"""