Skip to content

Commit

Permalink
Merge pull request #36438 from Dr15Jones/finalPath_12_2
Browse files Browse the repository at this point in the history
Added FinalPath implementation [12_2]
cmsbuild authored Dec 11, 2021
2 parents 266acf3 + 3d438da commit ea5c1fc
Showing 24 changed files with 1,004 additions and 1,633 deletions.
4 changes: 4 additions & 0 deletions FWCore/Framework/interface/EDConsumerBase.h
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@ namespace edm {
void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;
void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const;

//used for prefetching
std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType iType) const {
return itemsToGetFromBranch_[iType];
}
@@ -237,6 +238,9 @@ namespace edm {
iRecord.type());
}

//used for FinalPath
void resetItemsToGetFrom(BranchType iType) { itemsToGetFromBranch_[iType].clear(); }

private:
virtual void registerLateConsumes(eventsetup::ESRecordsToProxyIndices const&) {}
unsigned int recordConsumes(BranchType iBranch, TypeToGet const& iType, edm::InputTag const& iTag, bool iAlwaysGets);
8 changes: 0 additions & 8 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
@@ -152,14 +152,6 @@ namespace edm {
/// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
int totalEventsFailed() const;

/// Turn end_paths "off" if "active" is false;
/// turn end_paths "on" if "active" is true.
void enableEndPaths(bool active);

/// Return true if end_paths are active, and false if they are
/// inactive.
bool endPathsEnabled() const;

/// Clears counters used by trigger report.
void clearCounters();

254 changes: 254 additions & 0 deletions FWCore/Framework/interface/OutputModuleCore.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
#ifndef FWCore_Framework_one_OutputModuleCore_h
#define FWCore_Framework_one_OutputModuleCore_h
// -*- C++ -*-
//
// Package: FWCore/Framework
// Class : OutputModuleCore
//
/**\class OutputModuleCore OutputModuleCore.h "FWCore/Framework/interface/one/OutputModuleCore.h"
Description: Base class for all 'one' OutputModules
Usage:
<usage>
*/
//
// Original Author: Chris Jones
// Created: Wed, 31 Jul 2013 15:37:16 GMT
//

// system include files
#include <array>
#include <memory>
#include <string>
#include <vector>
#include <map>
#include <atomic>
#include <set>

// user include files
#include "DataFormats/Provenance/interface/BranchID.h"
#include "DataFormats/Provenance/interface/BranchIDList.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "DataFormats/Provenance/interface/SelectedProducts.h"

#include "FWCore/Common/interface/FWCoreCommonFwd.h"
#include "FWCore/Common/interface/OutputProcessBlockHelper.h"
#include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/ProductSelectorRules.h"
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/getAllTriggerNames.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Utilities/interface/propagate_const.h"

// forward declarations
namespace edm {

class MergeableRunProductMetadata;
class ModuleCallingContext;
class PreallocationConfiguration;
class ActivityRegistry;
class ThinnedAssociationsHelper;

template <typename T>
class OutputModuleCommunicatorT;

namespace maker {
template <typename T>
class ModuleHolderT;
}

namespace core {

class OutputModuleCore : public EDConsumerBase {
public:
template <typename U>
friend class edm::maker::ModuleHolderT;
template <typename T>
friend class ::edm::WorkerT;
template <typename T>
friend class ::edm::OutputModuleCommunicatorT;
typedef OutputModuleCore ModuleType;

explicit OutputModuleCore(ParameterSet const& pset);
~OutputModuleCore() override;

OutputModuleCore(OutputModuleCore const&) = delete; // Disallow copying and moving
OutputModuleCore& operator=(OutputModuleCore const&) = delete; // Disallow copying and moving

/// Accessor for maximum number of events to be written.
/// -1 is used for unlimited.
int maxEvents() const { return maxEvents_; }

/// Accessor for remaining number of events to be written.
/// -1 is used for unlimited.
int remainingEvents() const { return remainingEvents_; }

bool selected(BranchDescription const& desc) const;

void selectProducts(ProductRegistry const& preg, ThinnedAssociationsHelper const&, ProcessBlockHelperBase const&);
std::string const& processName() const { return process_name_; }
SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; }
std::array<bool, NumBranchTypes> const& hasNewlyDroppedBranch() const { return hasNewlyDroppedBranch_; }

static void fillDescription(
ParameterSetDescription& desc,
std::vector<std::string> const& iDefaultOutputCommands = ProductSelectorRules::defaultSelectionStrings());
static void fillDescriptions(ConfigurationDescriptions& descriptions);
static const std::string& baseType();
static void prevalidate(ConfigurationDescriptions&);

bool wantAllEvents() const { return wantAllEvents_; }

BranchIDLists const* branchIDLists() const;

OutputProcessBlockHelper const& outputProcessBlockHelper() const { return outputProcessBlockHelper_; }

ThinnedAssociationsHelper const* thinnedAssociationsHelper() const;

const ModuleDescription& moduleDescription() const { return moduleDescription_; }

protected:
ModuleDescription const& description() const;

ParameterSetID selectorConfig() const { return selector_config_id_; }

void doPreallocate_(PreallocationConfiguration const&);
virtual void preallocLumis(unsigned int);

void doBeginJob_();
void doEndJob();
bool doEvent_(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
void doAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
void doEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
bool doBeginRun(RunTransitionInfo const&, ModuleCallingContext const*);
bool doEndRun(RunTransitionInfo const&, ModuleCallingContext const*);
bool doBeginLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*);
bool doEndLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*);

void setEventSelectionInfo(
std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
bool anyProductProduced);

void configure(OutputModuleDescription const& desc);

std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID() {
return droppedBranchIDToKeptBranchID_;
}

//inheriting classes decrement this in doEvent in a manner that will be thread-safe for that class
std::atomic<int> remainingEvents_;

private:
int maxEvents_;

// TODO: Give OutputModule
// an interface (protected?) that supplies client code with the
// needed functionality *without* giving away implementation
// details ... don't just return a reference to keptProducts_, because
// we are looking to have the flexibility to change the
// implementation of keptProducts_ without modifying clients. When this
// change is made, we'll have a one-time-only task of modifying
// clients (classes derived from OutputModule) to use the
// newly-introduced interface.
// TODO: Consider using shared pointers here?

// keptProducts_ are pointers to the BranchDescription objects describing
// the branches we are to write.
//
// We do not own the BranchDescriptions to which we point.
SelectedProductsForBranchType keptProducts_;
std::array<bool, NumBranchTypes> hasNewlyDroppedBranch_;

std::string process_name_;
ProductSelectorRules productSelectorRules_;
ProductSelector productSelector_;
ModuleDescription moduleDescription_;

bool wantAllEvents_;
std::vector<detail::TriggerResultsBasedEventSelector> selectors_;
ParameterSet selectEvents_;
std::vector<EDGetToken> tokensForEndPaths_; //needed for FinalPath
bool onFinalPath_ = false;
// ID of the ParameterSet that configured the event selector
// subsystem.
ParameterSetID selector_config_id_;

// needed because of possible EDAliases.
// filled in only if key and value are different.
std::map<BranchID::value_type, BranchID::value_type> droppedBranchIDToKeptBranchID_;
edm::propagate_const<std::unique_ptr<BranchIDLists>> branchIDLists_;
BranchIDLists const* origBranchIDLists_;

edm::propagate_const<std::unique_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
std::map<BranchID, bool> keepAssociation_;

OutputProcessBlockHelper outputProcessBlockHelper_;

//------------------------------------------------------------------
// private member functions
//------------------------------------------------------------------
void updateBranchIDListsWithKeptAliases();

void doWriteProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*);
void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*, MergeableRunProductMetadata const*);
void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*);
void doOpenFile(FileBlock const& fb);
void doRespondToOpenInputFile(FileBlock const& fb);
void doRespondToCloseInputFile(FileBlock const& fb);
void doRespondToCloseOutputFile() {}
void doRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) {}

/// Tell the OutputModule that is must end the current file.
void doCloseFile();

void registerProductsAndCallbacks(OutputModuleCore const*, ProductRegistry const*) {}

bool needToRunSelection() const;
std::vector<ProductResolverIndexAndSkipBit> productsUsedBySelection() const;
bool prePrefetchSelection(StreamID id, EventPrincipal const&, ModuleCallingContext const*);

// Do the end-of-file tasks; this is only called internally, after
// the appropriate tests have been done.
virtual void reallyCloseFile();

/// Ask the OutputModule if we should end the current file.
virtual bool shouldWeCloseFile() const { return false; }

virtual void write(EventForOutput const&) = 0;

virtual void beginJob() {}
virtual void endJob() {}
virtual void writeLuminosityBlock(LuminosityBlockForOutput const&) = 0;
virtual void writeRun(RunForOutput const&) = 0;
virtual void writeProcessBlock(ProcessBlockForOutput const&) {}
virtual void openFile(FileBlock const&) {}
virtual bool isFileOpen() const { return true; }

virtual void doBeginRun_(RunForOutput const&) {}
virtual void doEndRun_(RunForOutput const&) {}
virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const&) {}
virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const&) {}
virtual void doRespondToOpenInputFile_(FileBlock const&) {}
virtual void doRespondToCloseInputFile_(FileBlock const&) {}

virtual void setProcessesWithSelectedMergeableRunProducts(std::set<std::string> const&) {}

bool hasAccumulator() const { return false; }

void keepThisBranch(BranchDescription const& desc,
std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
std::set<BranchID>& keptProductsInEvent);

void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }

bool limitReached() const { return remainingEvents_ == 0; }
};
} // namespace core
} // namespace edm
#endif
10 changes: 0 additions & 10 deletions FWCore/Framework/interface/Schedule.h
Original file line number Diff line number Diff line change
@@ -253,14 +253,6 @@ namespace edm {
/// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
int totalEventsFailed() const;

/// Turn end_paths "off" if "active" is false;
/// turn end_paths "on" if "active" is true.
void enableEndPaths(bool active);

/// Return true if end_paths are active, and false if they are
/// inactive.
bool endPathsEnabled() const;

/// Return the trigger report information on paths,
/// modules-in-path, modules-in-endpath, and modules.
void getTriggerReport(TriggerReport& rep) const;
@@ -319,8 +311,6 @@ namespace edm {
std::vector<std::string> const* pathNames_;
std::vector<std::string> const* endPathNames_;
bool wantSummary_;

volatile bool endpathsAreActive_;
};

template <typename T>
9 changes: 0 additions & 9 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
@@ -233,14 +233,6 @@ namespace edm {
/// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }

/// Turn end_paths "off" if "active" is false;
/// turn end_paths "on" if "active" is true.
void enableEndPaths(bool active);

/// Return true if end_paths are active, and false if they are
/// inactive.
bool endPathsEnabled() const;

/// Return the trigger report information on paths,
/// modules-in-path, modules-in-endpath, and modules.
void getTriggerReport(TriggerReport& rep) const;
@@ -368,7 +360,6 @@ namespace edm {

StreamID streamID_;
StreamContext streamContext_;
volatile bool endpathsAreActive_;
std::atomic<bool> skippingEvent_;
};

14 changes: 0 additions & 14 deletions FWCore/Framework/interface/SubProcess.h
Original file line number Diff line number Diff line change
@@ -208,20 +208,6 @@ namespace edm {
return schedule_->totalEventsFailed();
}

/// Turn end_paths "off" if "active" is false;
/// Turn end_paths "on" if "active" is true.
void enableEndPaths(bool active) {
ServiceRegistry::Operate operate(serviceToken_);
schedule_->enableEndPaths(active);
for_all(subProcesses_, [active](auto& subProcess) { subProcess.enableEndPaths(active); });
}

/// Return true if end_paths are active, and false if they are inactive.
bool endPathsEnabled() const {
ServiceRegistry::Operate operate(serviceToken_);
return schedule_->endPathsEnabled();
}

/// Return the trigger report information on paths,
/// modules-in-path, modules-in-endpath, and modules.
void getTriggerReport(TriggerReport& rep) const {
195 changes: 2 additions & 193 deletions FWCore/Framework/interface/global/OutputModuleBase.h
Original file line number Diff line number Diff line change
@@ -17,53 +17,16 @@
//

// system include files
#include <array>
#include <memory>
#include <string>
#include <vector>
#include <map>
#include <atomic>
#include <mutex>
#include <set>

// user include files
#include "DataFormats/Provenance/interface/BranchID.h"
#include "DataFormats/Provenance/interface/BranchIDList.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "DataFormats/Provenance/interface/SelectedProducts.h"

#include "FWCore/Common/interface/FWCoreCommonFwd.h"
#include "FWCore/Common/interface/OutputProcessBlockHelper.h"
#include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/ProductSelectorRules.h"
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/getAllTriggerNames.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Framework/interface/OutputModuleCore.h"

// forward declarations
namespace edm {

class MergeableRunProductMetadata;
class ModuleCallingContext;
class PreallocationConfiguration;
class ActivityRegistry;
class ThinnedAssociationsHelper;

template <typename T>
class OutputModuleCommunicatorT;

namespace maker {
template <typename T>
class ModuleHolderT;
}

namespace global {

class OutputModuleBase : public EDConsumerBase {
class OutputModuleBase : public core::OutputModuleCore {
public:
template <typename U>
friend class edm::maker::ModuleHolderT;
@@ -74,41 +37,10 @@ namespace edm {
typedef OutputModuleBase ModuleType;

explicit OutputModuleBase(ParameterSet const& pset);
~OutputModuleBase() override;

OutputModuleBase(OutputModuleBase const&) = delete; // Disallow copying and moving
OutputModuleBase& operator=(OutputModuleBase const&) = delete; // Disallow copying and moving

/// Accessor for maximum number of events to be written.
/// -1 is used for unlimited.
int maxEvents() const { return maxEvents_; }

/// Accessor for remaining number of events to be written.
/// -1 is used for unlimited.
int remainingEvents() const { return remainingEvents_; }

bool selected(BranchDescription const& desc) const;

void selectProducts(ProductRegistry const& preg, ThinnedAssociationsHelper const&, ProcessBlockHelperBase const&);
std::string const& processName() const { return process_name_; }
SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; }
std::array<bool, NumBranchTypes> const& hasNewlyDroppedBranch() const { return hasNewlyDroppedBranch_; }

static void fillDescription(ParameterSetDescription& desc);
static void fillDescriptions(ConfigurationDescriptions& descriptions);
static const std::string& baseType();
static void prevalidate(ConfigurationDescriptions&);

bool wantAllEvents() const { return wantAllEvents_; }

BranchIDLists const* branchIDLists() const;

OutputProcessBlockHelper const& outputProcessBlockHelper() const { return outputProcessBlockHelper_; }

ThinnedAssociationsHelper const* thinnedAssociationsHelper() const;

const ModuleDescription& moduleDescription() const { return moduleDescription_; }

//Output modules always need writeRun and writeLumi to be called
bool wantsGlobalRuns() const { return true; }
bool wantsGlobalLuminosityBlocks() const { return true; }
@@ -119,14 +51,9 @@ namespace edm {
virtual bool wantsStreamLuminosityBlocks() const = 0;

protected:
ModuleDescription const& description() const;

ParameterSetID selectorConfig() const { return selector_config_id_; }

void doPreallocate(PreallocationConfiguration const&);

void doBeginJob();
void doEndJob();

void doBeginStream(StreamID id) { doBeginStream_(id); }
void doEndStream(StreamID id) { doEndStream_(id); }
@@ -141,113 +68,12 @@ namespace edm {
ModuleCallingContext const& iModuleCallingContext,
Principal const& iPrincipal) const {}

void doBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
void doAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
void doEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
bool doBeginRun(RunTransitionInfo const&, ModuleCallingContext const*);
bool doEndRun(RunTransitionInfo const&, ModuleCallingContext const*);
bool doBeginLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*);
bool doEndLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*);

void setEventSelectionInfo(
std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
bool anyProductProduced);

void configure(OutputModuleDescription const& desc);

std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID() {
return droppedBranchIDToKeptBranchID_;
}

private:
int maxEvents_;
std::atomic<int> remainingEvents_;

// TODO: Give OutputModule
// an interface (protected?) that supplies client code with the
// needed functionality *without* giving away implementation
// details ... don't just return a reference to keptProducts_, because
// we are looking to have the flexibility to change the
// implementation of keptProducts_ without modifying clients. When this
// change is made, we'll have a one-time-only task of modifying
// clients (classes derived from OutputModule) to use the
// newly-introduced interface.
// TODO: Consider using shared pointers here?

// keptProducts_ are pointers to the BranchDescription objects describing
// the branches we are to write.
//
// We do not own the BranchDescriptions to which we point.
SelectedProductsForBranchType keptProducts_;
std::array<bool, NumBranchTypes> hasNewlyDroppedBranch_;

std::string process_name_;
ProductSelectorRules productSelectorRules_;
ProductSelector productSelector_;
ModuleDescription moduleDescription_;

bool wantAllEvents_;
std::vector<detail::TriggerResultsBasedEventSelector> selectors_;
ParameterSet selectEvents_;
// ID of the ParameterSet that configured the event selector
// subsystem.
ParameterSetID selector_config_id_;

// needed because of possible EDAliases.
// filled in only if key and value are different.
std::map<BranchID::value_type, BranchID::value_type> droppedBranchIDToKeptBranchID_;
edm::propagate_const<std::unique_ptr<BranchIDLists>> branchIDLists_;
BranchIDLists const* origBranchIDLists_;

edm::propagate_const<std::unique_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
std::map<BranchID, bool> keepAssociation_;

OutputProcessBlockHelper outputProcessBlockHelper_;

//------------------------------------------------------------------
// private member functions
//------------------------------------------------------------------

void updateBranchIDListsWithKeptAliases();

void doWriteProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*);
void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*, MergeableRunProductMetadata const*);
void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*);
void doOpenFile(FileBlock const& fb);
void doRespondToOpenInputFile(FileBlock const& fb);
void doRespondToCloseInputFile(FileBlock const& fb);
void doRespondToCloseOutputFile() {}
void doRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) {}

std::string workerType() const { return "WorkerT<edm::global::OutputModuleBase>"; }

/// Tell the OutputModule that is must end the current file.
void doCloseFile();

void registerProductsAndCallbacks(OutputModuleBase const*, ProductRegistry const*) {}

bool needToRunSelection() const;
std::vector<ProductResolverIndexAndSkipBit> productsUsedBySelection() const;
bool prePrefetchSelection(StreamID id, EventPrincipal const&, ModuleCallingContext const*);

// Do the end-of-file tasks; this is only called internally, after
// the appropriate tests have been done.
virtual void reallyCloseFile();

/// Ask the OutputModule if we should end the current file.
virtual bool shouldWeCloseFile() const { return false; }

virtual void write(EventForOutput const&) = 0;
virtual void beginJob() {}
virtual void endJob() {}
virtual void writeLuminosityBlock(LuminosityBlockForOutput const&) = 0;
virtual void writeRun(RunForOutput const&) = 0;
virtual void writeProcessBlock(ProcessBlockForOutput const&) {}
virtual void openFile(FileBlock const&) {}
virtual bool isFileOpen() const { return true; }

virtual void preallocStreams(unsigned int) {}
virtual void preallocLumis(unsigned int) {}
virtual void preallocate(PreallocationConfiguration const&) {}
virtual void doBeginStream_(StreamID) {}
virtual void doEndStream_(StreamID) {}
@@ -258,30 +84,13 @@ namespace edm {
virtual void doStreamEndLuminosityBlock_(StreamID, LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doStreamEndLuminosityBlockSummary_(StreamID, LuminosityBlockForOutput const&, EventSetup const&) {}

virtual void doBeginRun_(RunForOutput const&) {}
virtual void doBeginRunSummary_(RunForOutput const&, EventSetup const&) {}
virtual void doEndRun_(RunForOutput const&) {}
virtual void doEndRunSummary_(RunForOutput const&, EventSetup const&) {}
virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const&) {}
virtual void doBeginLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const&) {}
virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doRespondToOpenInputFile_(FileBlock const&) {}
virtual void doRespondToCloseInputFile_(FileBlock const&) {}
virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder&) {}

virtual void setProcessesWithSelectedMergeableRunProducts(std::set<std::string> const&) {}

virtual bool hasAcquire() const { return false; }
bool hasAccumulator() const { return false; }

void keepThisBranch(BranchDescription const& desc,
std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
std::set<BranchID>& keptProductsInEvent);

void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }

bool limitReached() const { return remainingEvents_ == 0; }
};
} // namespace global
} // namespace edm
198 changes: 5 additions & 193 deletions FWCore/Framework/interface/limited/OutputModuleBase.h
Original file line number Diff line number Diff line change
@@ -17,54 +17,17 @@
//

// system include files
#include <array>
#include <memory>
#include <string>
#include <vector>
#include <map>
#include <atomic>
#include <mutex>
#include <set>

// user include files
#include "DataFormats/Provenance/interface/BranchID.h"
#include "DataFormats/Provenance/interface/BranchIDList.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "DataFormats/Provenance/interface/SelectedProducts.h"

#include "FWCore/Common/interface/FWCoreCommonFwd.h"
#include "FWCore/Common/interface/OutputProcessBlockHelper.h"
#include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/ProductSelectorRules.h"
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/getAllTriggerNames.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Framework/interface/OutputModuleCore.h"
#include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"

// forward declarations
namespace edm {

class MergeableRunProductMetadata;
class ModuleCallingContext;
class PreallocationConfiguration;
class ActivityRegistry;
class ThinnedAssociationsHelper;

template <typename T>
class OutputModuleCommunicatorT;

namespace maker {
template <typename T>
class ModuleHolderT;
}

namespace limited {

class OutputModuleBase : public EDConsumerBase {
class OutputModuleBase : public core::OutputModuleCore {
public:
template <typename U>
friend class edm::maker::ModuleHolderT;
@@ -75,40 +38,13 @@ namespace edm {
typedef OutputModuleBase ModuleType;

explicit OutputModuleBase(ParameterSet const& pset);
~OutputModuleBase() override;

OutputModuleBase(OutputModuleBase const&) = delete; // Disallow copying and moving
OutputModuleBase& operator=(OutputModuleBase const&) = delete; // Disallow copying and moving

/// Accessor for maximum number of events to be written.
/// -1 is used for unlimited.
int maxEvents() const { return maxEvents_; }

/// Accessor for remaining number of events to be written.
/// -1 is used for unlimited.
int remainingEvents() const { return remainingEvents_; }

bool selected(BranchDescription const& desc) const;

void selectProducts(ProductRegistry const& preg, ThinnedAssociationsHelper const&, ProcessBlockHelperBase const&);
std::string const& processName() const { return process_name_; }
SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; }
std::array<bool, NumBranchTypes> const& hasNewlyDroppedBranch() const { return hasNewlyDroppedBranch_; }

static void fillDescription(ParameterSetDescription& desc);
static void fillDescriptions(ConfigurationDescriptions& descriptions);
static const std::string& baseType();
static void prevalidate(ConfigurationDescriptions&);

bool wantAllEvents() const { return wantAllEvents_; }

BranchIDLists const* branchIDLists() const;

OutputProcessBlockHelper const& outputProcessBlockHelper() const { return outputProcessBlockHelper_; }

ThinnedAssociationsHelper const* thinnedAssociationsHelper() const;

const ModuleDescription& moduleDescription() const { return moduleDescription_; }
static void fillDescription(
ParameterSetDescription& desc,
std::vector<std::string> const& iDefaultOutputCommands = ProductSelectorRules::defaultSelectionStrings());

//Output modules always need writeRun and writeLumi to be called
bool wantsGlobalRuns() const { return true; }
@@ -123,14 +59,9 @@ namespace edm {
LimitedTaskQueue& queue() { return queue_; }

protected:
ModuleDescription const& description() const;

ParameterSetID selectorConfig() const { return selector_config_id_; }

void doPreallocate(PreallocationConfiguration const&);

void doBeginJob();
void doEndJob();

void doBeginStream(StreamID id);
void doEndStream(StreamID id);
@@ -141,115 +72,14 @@ namespace edm {
ModuleCallingContext const&,
Principal const&) const {}

void doBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
void doAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
void doEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
bool doBeginRun(RunTransitionInfo const&, ModuleCallingContext const*);
bool doEndRun(RunTransitionInfo const&, ModuleCallingContext const*);
bool doBeginLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*);
bool doEndLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*);

void setEventSelectionInfo(
std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
bool anyProductProduced);

void configure(OutputModuleDescription const& desc);

std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID() {
return droppedBranchIDToKeptBranchID_;
}

private:
int maxEvents_;
std::atomic<int> remainingEvents_;

// TODO: Give OutputModule
// an interface (protected?) that supplies client code with the
// needed functionality *without* giving away implementation
// details ... don't just return a reference to keptProducts_, because
// we are looking to have the flexibility to change the
// implementation of keptProducts_ without modifying clients. When this
// change is made, we'll have a one-time-only task of modifying
// clients (classes derived from OutputModule) to use the
// newly-introduced interface.
// TODO: Consider using shared pointers here?

// keptProducts_ are pointers to the BranchDescription objects describing
// the branches we are to write.
//
// We do not own the BranchDescriptions to which we point.
SelectedProductsForBranchType keptProducts_;
std::array<bool, NumBranchTypes> hasNewlyDroppedBranch_;

std::string process_name_;
ProductSelectorRules productSelectorRules_;
ProductSelector productSelector_;
ModuleDescription moduleDescription_;

bool wantAllEvents_;
std::vector<detail::TriggerResultsBasedEventSelector> selectors_;
ParameterSet selectEvents_;
// ID of the ParameterSet that configured the event selector
// subsystem.
ParameterSetID selector_config_id_;

// needed because of possible EDAliases.
// filled in only if key and value are different.
std::map<BranchID::value_type, BranchID::value_type> droppedBranchIDToKeptBranchID_;
edm::propagate_const<std::unique_ptr<BranchIDLists>> branchIDLists_;
BranchIDLists const* origBranchIDLists_;

edm::propagate_const<std::unique_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
std::map<BranchID, bool> keepAssociation_;

OutputProcessBlockHelper outputProcessBlockHelper_;

LimitedTaskQueue queue_;

//------------------------------------------------------------------
// private member functions
//------------------------------------------------------------------

void updateBranchIDListsWithKeptAliases();

void doWriteProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*);
void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*, MergeableRunProductMetadata const*);
void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*);
void doOpenFile(FileBlock const& fb);
void doRespondToOpenInputFile(FileBlock const& fb);
void doRespondToCloseInputFile(FileBlock const& fb);
void doRespondToCloseOutputFile() {}
void doRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) {}

std::string workerType() const { return "WorkerT<edm::limited::OutputModuleBase>"; }

/// Tell the OutputModule that is must end the current file.
void doCloseFile();

void registerProductsAndCallbacks(OutputModuleBase const*, ProductRegistry const*) {}

bool needToRunSelection() const;
std::vector<ProductResolverIndexAndSkipBit> productsUsedBySelection() const;
bool prePrefetchSelection(StreamID id, EventPrincipal const&, ModuleCallingContext const*);

// Do the end-of-file tasks; this is only called internally, after
// the appropriate tests have been done.
virtual void reallyCloseFile();

/// Ask the OutputModule if we should end the current file.
virtual bool shouldWeCloseFile() const { return false; }

virtual void write(EventForOutput const&) = 0;
virtual void beginJob() {}
virtual void endJob() {}
virtual void writeLuminosityBlock(LuminosityBlockForOutput const&) = 0;
virtual void writeRun(RunForOutput const&) = 0;
virtual void writeProcessBlock(ProcessBlockForOutput const&) {}
virtual void openFile(FileBlock const&) const {}
virtual bool isFileOpen() const { return true; }

virtual void preallocStreams(unsigned int) {}
virtual void preallocLumis(unsigned int) {}
virtual void preallocate(PreallocationConfiguration const&) {}
virtual void doBeginStream_(StreamID) {}
virtual void doEndStream_(StreamID) {}
@@ -260,29 +90,11 @@ namespace edm {
virtual void doStreamEndLuminosityBlock_(StreamID, LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doStreamEndLuminosityBlockSummary_(StreamID, LuminosityBlockForOutput const&, EventSetup const&) {}

virtual void doBeginRun_(RunForOutput const&) {}
virtual void doBeginRunSummary_(RunForOutput const&, EventSetup const&) {}
virtual void doEndRun_(RunForOutput const&) {}
virtual void doEndRunSummary_(RunForOutput const&, EventSetup const&) {}
virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const&) {}
virtual void doBeginLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const&) {}
virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doRespondToOpenInputFile_(FileBlock const&) {}
virtual void doRespondToCloseInputFile_(FileBlock const&) {}

virtual void setProcessesWithSelectedMergeableRunProducts(std::set<std::string> const&) {}

bool hasAcquire() const { return false; }
bool hasAccumulator() const { return false; }

void keepThisBranch(BranchDescription const& desc,
std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
std::set<BranchID>& keptProductsInEvent);

void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }

bool limitReached() const { return remainingEvents_ == 0; }
};
} // namespace limited
} // namespace edm
191 changes: 2 additions & 189 deletions FWCore/Framework/interface/one/OutputModuleBase.h
Original file line number Diff line number Diff line change
@@ -19,54 +19,19 @@
//

// system include files
#include <array>
#include <memory>
#include <string>
#include <vector>
#include <map>
#include <atomic>
#include <set>

// user include files
#include "DataFormats/Provenance/interface/BranchID.h"
#include "DataFormats/Provenance/interface/BranchIDList.h"
#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "DataFormats/Provenance/interface/SelectedProducts.h"

#include "FWCore/Common/interface/FWCoreCommonFwd.h"
#include "FWCore/Common/interface/OutputProcessBlockHelper.h"
#include "FWCore/Framework/interface/TriggerResultsBasedEventSelector.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/ProductSelectorRules.h"
#include "FWCore/Framework/interface/ProductSelector.h"
#include "FWCore/Framework/interface/EDConsumerBase.h"
#include "FWCore/Framework/interface/getAllTriggerNames.h"
#include "FWCore/Framework/interface/OutputModuleCore.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/ParameterSet/interface/ParameterSetfwd.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Utilities/interface/propagate_const.h"

// forward declarations
namespace edm {

class MergeableRunProductMetadata;
class ModuleCallingContext;
class PreallocationConfiguration;
class ActivityRegistry;
class ThinnedAssociationsHelper;
class SubProcessParentageHelper;

template <typename T>
class OutputModuleCommunicatorT;

namespace maker {
template <typename T>
class ModuleHolderT;
}

namespace one {

class OutputModuleBase : public EDConsumerBase {
class OutputModuleBase : public core::OutputModuleCore {
public:
template <typename U>
friend class edm::maker::ModuleHolderT;
@@ -77,33 +42,10 @@ namespace edm {
typedef OutputModuleBase ModuleType;

explicit OutputModuleBase(ParameterSet const& pset);
~OutputModuleBase() override;

OutputModuleBase(OutputModuleBase const&) = delete; // Disallow copying and moving
OutputModuleBase& operator=(OutputModuleBase const&) = delete; // Disallow copying and moving

/// Accessor for maximum number of events to be written.
/// -1 is used for unlimited.
int maxEvents() const { return maxEvents_; }

/// Accessor for remaining number of events to be written.
/// -1 is used for unlimited.
int remainingEvents() const { return remainingEvents_; }

bool selected(BranchDescription const& desc) const;

void selectProducts(ProductRegistry const& preg, ThinnedAssociationsHelper const&, ProcessBlockHelperBase const&);
std::string const& processName() const { return process_name_; }
SelectedProductsForBranchType const& keptProducts() const { return keptProducts_; }
std::array<bool, NumBranchTypes> const& hasNewlyDroppedBranch() const { return hasNewlyDroppedBranch_; }

static void fillDescription(
ParameterSetDescription& desc,
std::vector<std::string> const& iDefaultOutputCommands = ProductSelectorRules::defaultSelectionStrings());
static void fillDescriptions(ConfigurationDescriptions& descriptions);
static const std::string& baseType();
static void prevalidate(ConfigurationDescriptions&);

//Output modules always need writeRun and writeLumi to be called
virtual bool wantsProcessBlocks() const = 0;
virtual bool wantsInputProcessBlocks() const = 0;
@@ -116,163 +58,34 @@ namespace edm {
virtual SerialTaskQueue* globalLuminosityBlocksQueue() { return nullptr; }
SharedResourcesAcquirer& sharedResourcesAcquirer() { return resourcesAcquirer_; }

bool wantAllEvents() const { return wantAllEvents_; }

BranchIDLists const* branchIDLists();

OutputProcessBlockHelper const& outputProcessBlockHelper() const { return outputProcessBlockHelper_; }

ThinnedAssociationsHelper const* thinnedAssociationsHelper() const;

SubProcessParentageHelper const* subProcessParentageHelper() const { return subProcessParentageHelper_; }

const ModuleDescription& moduleDescription() const { return moduleDescription_; }

protected:
ModuleDescription const& description() const;

ParameterSetID selectorConfig() const { return selector_config_id_; }

void doPreallocate(PreallocationConfiguration const&);
virtual void preallocLumis(unsigned int);

void doBeginJob();
void doEndJob();
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
void doAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
void doEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) {}
bool doBeginRun(RunTransitionInfo const&, ModuleCallingContext const*);
bool doEndRun(RunTransitionInfo const&, ModuleCallingContext const*);
bool doBeginLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*);
bool doEndLuminosityBlock(LumiTransitionInfo const&, ModuleCallingContext const*);

void setEventSelectionInfo(
std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
bool anyProductProduced);

void configure(OutputModuleDescription const& desc);

std::map<BranchID::value_type, BranchID::value_type> const& droppedBranchIDToKeptBranchID() {
return droppedBranchIDToKeptBranchID_;
}

private:
int maxEvents_;
std::atomic<int> remainingEvents_;

// TODO: Give OutputModule
// an interface (protected?) that supplies client code with the
// needed functionality *without* giving away implementation
// details ... don't just return a reference to keptProducts_, because
// we are looking to have the flexibility to change the
// implementation of keptProducts_ without modifying clients. When this
// change is made, we'll have a one-time-only task of modifying
// clients (classes derived from OutputModule) to use the
// newly-introduced interface.
// TODO: Consider using shared pointers here?

// keptProducts_ are pointers to the BranchDescription objects describing
// the branches we are to write.
//
// We do not own the BranchDescriptions to which we point.
SelectedProductsForBranchType keptProducts_;
std::array<bool, NumBranchTypes> hasNewlyDroppedBranch_;

std::string process_name_;
ProductSelectorRules productSelectorRules_;
ProductSelector productSelector_;
ModuleDescription moduleDescription_;

bool wantAllEvents_;
std::vector<detail::TriggerResultsBasedEventSelector> selectors_;
ParameterSet selectEvents_;
// ID of the ParameterSet that configured the event selector
// subsystem.
ParameterSetID selector_config_id_;

// needed because of possible EDAliases.
// filled in only if key and value are different.
std::map<BranchID::value_type, BranchID::value_type> droppedBranchIDToKeptBranchID_;
edm::propagate_const<std::unique_ptr<BranchIDLists>> branchIDLists_;
BranchIDLists const* origBranchIDLists_;

SubProcessParentageHelper const* subProcessParentageHelper_;

edm::propagate_const<std::unique_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
std::map<BranchID, bool> keepAssociation_;

OutputProcessBlockHelper outputProcessBlockHelper_;

SharedResourcesAcquirer resourcesAcquirer_;
SerialTaskQueue runQueue_;
SerialTaskQueue luminosityBlockQueue_;

//------------------------------------------------------------------
// private member functions
//------------------------------------------------------------------

virtual SharedResourcesAcquirer createAcquirer();

void doWriteProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*);
void doWriteRun(RunPrincipal const& rp, ModuleCallingContext const*, MergeableRunProductMetadata const*);
void doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp, ModuleCallingContext const*);
void doOpenFile(FileBlock const& fb);
void doRespondToOpenInputFile(FileBlock const& fb);
void doRespondToCloseInputFile(FileBlock const& fb);
void doRespondToCloseOutputFile() {}
void doRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) {}

std::string workerType() const { return "WorkerT<edm::one::OutputModuleBase>"; }

/// Tell the OutputModule that is must end the current file.
void doCloseFile();

void registerProductsAndCallbacks(OutputModuleBase const*, ProductRegistry const*) {}

bool needToRunSelection() const;
std::vector<ProductResolverIndexAndSkipBit> productsUsedBySelection() const;
bool prePrefetchSelection(StreamID id, EventPrincipal const&, ModuleCallingContext const*);

// Do the end-of-file tasks; this is only called internally, after
// the appropriate tests have been done.
virtual void reallyCloseFile();

/// Ask the OutputModule if we should end the current file.
virtual bool shouldWeCloseFile() const { return false; }

virtual void write(EventForOutput const&) = 0;
virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
ModuleCallingContext const& iModuleCallingContext,
Principal const& iPrincipal) const {}

virtual void beginJob() {}
virtual void endJob() {}
virtual void writeLuminosityBlock(LuminosityBlockForOutput const&) = 0;
virtual void writeRun(RunForOutput const&) = 0;
virtual void writeProcessBlock(ProcessBlockForOutput const&) {}
virtual void openFile(FileBlock const&) {}
virtual bool isFileOpen() const { return true; }

virtual void doBeginRun_(RunForOutput const&) {}
virtual void doEndRun_(RunForOutput const&) {}
virtual void doBeginLuminosityBlock_(LuminosityBlockForOutput const&) {}
virtual void doEndLuminosityBlock_(LuminosityBlockForOutput const&) {}
virtual void doRespondToOpenInputFile_(FileBlock const&) {}
virtual void doRespondToCloseInputFile_(FileBlock const&) {}

virtual void setProcessesWithSelectedMergeableRunProducts(std::set<std::string> const&) {}

bool hasAcquire() const { return false; }
bool hasAccumulator() const { return false; }

void keepThisBranch(BranchDescription const& desc,
std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
std::set<BranchID>& keptProductsInEvent);

void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }

bool limitReached() const { return remainingEvents_ == 0; }
};
} // namespace one
} // namespace edm
4 changes: 0 additions & 4 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
@@ -769,10 +769,6 @@ namespace edm {

int EventProcessor::totalEventsFailed() const { return schedule_->totalEventsFailed(); }

void EventProcessor::enableEndPaths(bool active) { schedule_->enableEndPaths(active); }

bool EventProcessor::endPathsEnabled() const { return schedule_->endPathsEnabled(); }

void EventProcessor::clearCounters() { schedule_->clearCounters(); }

namespace {
409 changes: 409 additions & 0 deletions FWCore/Framework/src/OutputModuleCore.cc

Large diffs are not rendered by default.

10 changes: 1 addition & 9 deletions FWCore/Framework/src/ProductResolvers.cc
Original file line number Diff line number Diff line change
@@ -436,15 +436,7 @@ namespace edm {
SharedResourcesAcquirer*,
ModuleCallingContext const*) const {
if (!skipCurrentProcess and worker_) {
return resolveProductImpl<true>([this]() {
edm::Exception ex(errors::UnimplementedFeature);
ex << "Attempting to run unscheduled module without doing prefetching";
std::ostringstream ost;
ost << "Calling produce method for unscheduled module " << worker_->description()->moduleName() << "/'"
<< worker_->description()->moduleLabel() << "'";
ex.addContext(ost.str());
throw ex;
});
return resolveProductImpl<false>([] {});
}
return Resolution(nullptr);
}
12 changes: 1 addition & 11 deletions FWCore/Framework/src/Schedule.cc
Original file line number Diff line number Diff line change
@@ -690,8 +690,7 @@ namespace edm {
preallocConfig_(prealloc),
pathNames_(&tns.getTrigPaths()),
endPathNames_(&tns.getEndPaths()),
wantSummary_(tns.wantSummary()),
endpathsAreActive_(true) {
wantSummary_(tns.wantSummary()) {
makePathStatusInserters(pathStatusInserters_,
*pathNames_,
prealloc,
@@ -1526,15 +1525,6 @@ namespace edm {
}
}

void Schedule::enableEndPaths(bool active) {
endpathsAreActive_ = active;
for (auto& s : streamSchedules_) {
s->enableEndPaths(active);
}
}

bool Schedule::endPathsEnabled() const { return endpathsAreActive_; }

void Schedule::getTriggerReport(TriggerReport& rep) const {
rep.eventSummary.totalEvents = 0;
rep.eventSummary.totalEventsPassed = 0;
5 changes: 0 additions & 5 deletions FWCore/Framework/src/StreamSchedule.cc
Original file line number Diff line number Diff line change
@@ -162,7 +162,6 @@ namespace edm {
number_of_unscheduled_modules_(0),
streamID_(streamID),
streamContext_(streamID_, processContext),
endpathsAreActive_(true),
skippingEvent_(false) {
ParameterSet const& opts = proc_pset.getUntrackedParameterSet("options", ParameterSet());
bool hasPath = false;
@@ -832,10 +831,6 @@ namespace edm {
}
}

void StreamSchedule::enableEndPaths(bool active) { endpathsAreActive_ = active; }

bool StreamSchedule::endPathsEnabled() const { return endpathsAreActive_; }

static void fillModuleInPathSummary(Path const& path, size_t which, ModuleInPathSummary& sum) {
sum.timesVisited += path.timesVisited(which);
sum.timesPassed += path.timesPassed(which);
341 changes: 5 additions & 336 deletions FWCore/Framework/src/global/OutputModuleBase.cc

Large diffs are not rendered by default.

340 changes: 7 additions & 333 deletions FWCore/Framework/src/limited/OutputModuleBase.cc

Large diffs are not rendered by default.

315 changes: 6 additions & 309 deletions FWCore/Framework/src/one/OutputModuleBase.cc
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

#include "DataFormats/Common/interface/Handle.h"
#include "DataFormats/Common/interface/ThinnedAssociation.h"
#include "DataFormats/Common/interface/EndPathStatus.h"
#include "DataFormats/Provenance/interface/BranchDescription.h"
#include "DataFormats/Provenance/interface/BranchKey.h"
#include "DataFormats/Provenance/interface/ProductRegistry.h"
@@ -43,339 +44,35 @@ namespace edm {
namespace one {

// -------------------------------------------------------
OutputModuleBase::OutputModuleBase(ParameterSet const& pset)
: maxEvents_(-1),
remainingEvents_(maxEvents_),
keptProducts_(),
hasNewlyDroppedBranch_(),
process_name_(),
productSelectorRules_(pset, "outputCommands", "OutputModule"),
productSelector_(),
moduleDescription_(),
wantAllEvents_(false),
selectors_(),
selector_config_id_(),
droppedBranchIDToKeptBranchID_(),
branchIDLists_(new BranchIDLists),
origBranchIDLists_(nullptr),
thinnedAssociationsHelper_(new ThinnedAssociationsHelper) {
hasNewlyDroppedBranch_.fill(false);

Service<service::TriggerNamesService> tns;
process_name_ = tns->getProcessName();

selectEvents_ = pset.getUntrackedParameterSet("SelectEvents", ParameterSet());

selectEvents_.registerIt(); // Just in case this PSet is not registered

selector_config_id_ = selectEvents_.id();

//need to set wantAllEvents_ in constructor
// we will make the remaining selectors once we know how many streams
selectors_.resize(1);
wantAllEvents_ = detail::configureEventSelector(
selectEvents_, process_name_, getAllTriggerNames(), selectors_[0], consumesCollector());
}
OutputModuleBase::OutputModuleBase(ParameterSet const& pset) : core::OutputModuleCore(pset) {}

void OutputModuleBase::configure(OutputModuleDescription const& desc) {
remainingEvents_ = maxEvents_ = desc.maxEvents_;
origBranchIDLists_ = desc.branchIDLists_;
core::OutputModuleCore::configure(desc);
subProcessParentageHelper_ = desc.subProcessParentageHelper_;
}

void OutputModuleBase::selectProducts(ProductRegistry const& preg,
ThinnedAssociationsHelper const& thinnedAssociationsHelper,
ProcessBlockHelperBase const& processBlockHelper) {
if (productSelector_.initialized())
return;
productSelector_.initialize(productSelectorRules_, preg.allBranchDescriptions());

// TODO: See if we can collapse keptProducts_ and productSelector_ into a
// single object. See the notes in the header for ProductSelector
// for more information.

std::map<BranchID, BranchDescription const*> trueBranchIDToKeptBranchDesc;
std::vector<BranchDescription const*> associationDescriptions;
std::set<BranchID> keptProductsInEvent;
std::set<std::string> processesWithSelectedMergeableRunProducts;
std::set<std::string> processesWithKeptProcessBlockProducts;

for (auto const& it : preg.productList()) {
BranchDescription const& desc = it.second;
if (desc.transient()) {
// if the class of the branch is marked transient, output nothing
} else if (!desc.present() && !desc.produced()) {
// else if the branch containing the product has been previously dropped,
// output nothing
} else if (desc.unwrappedType() == typeid(ThinnedAssociation)) {
associationDescriptions.push_back(&desc);
} else if (selected(desc)) {
keepThisBranch(desc, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
insertSelectedProcesses(
desc, processesWithSelectedMergeableRunProducts, processesWithKeptProcessBlockProducts);
} else {
// otherwise, output nothing,
// and mark the fact that there is a newly dropped branch of this type.
hasNewlyDroppedBranch_[desc.branchType()] = true;
}
}

setProcessesWithSelectedMergeableRunProducts(processesWithSelectedMergeableRunProducts);

thinnedAssociationsHelper.selectAssociationProducts(
associationDescriptions, keptProductsInEvent, keepAssociation_);

for (auto association : associationDescriptions) {
if (keepAssociation_[association->branchID()]) {
keepThisBranch(*association, trueBranchIDToKeptBranchDesc, keptProductsInEvent);
} else {
hasNewlyDroppedBranch_[association->branchType()] = true;
}
}

// Now fill in a mapping needed in the case that a branch was dropped while its EDAlias was kept.
ProductSelector::fillDroppedToKept(preg, trueBranchIDToKeptBranchDesc, droppedBranchIDToKeptBranchID_);

thinnedAssociationsHelper_->updateFromParentProcess(
thinnedAssociationsHelper, keepAssociation_, droppedBranchIDToKeptBranchID_);
outputProcessBlockHelper_.updateAfterProductSelection(processesWithKeptProcessBlockProducts, processBlockHelper);
}

void OutputModuleBase::keepThisBranch(BranchDescription const& desc,
std::map<BranchID, BranchDescription const*>& trueBranchIDToKeptBranchDesc,
std::set<BranchID>& keptProductsInEvent) {
ProductSelector::checkForDuplicateKeptBranch(desc, trueBranchIDToKeptBranchDesc);

EDGetToken token;

std::vector<std::string> missingDictionaries;
if (!checkDictionary(missingDictionaries, desc.className(), desc.unwrappedType())) {
std::string context("Calling OutputModuleBase::keepThisBranch, checking dictionaries for kept types");
throwMissingDictionariesException(missingDictionaries, context);
}

switch (desc.branchType()) {
case InEvent: {
if (desc.produced()) {
keptProductsInEvent.insert(desc.originalBranchID());
} else {
keptProductsInEvent.insert(desc.branchID());
}
token = consumes(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
InputTag{desc.moduleLabel(), desc.productInstanceName(), desc.processName()});
break;
}
case InLumi: {
token = consumes<InLumi>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
break;
}
case InRun: {
token = consumes<InRun>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
break;
}
case InProcess: {
token = consumes<InProcess>(TypeToGet{desc.unwrappedTypeID(), PRODUCT_TYPE},
InputTag(desc.moduleLabel(), desc.productInstanceName(), desc.processName()));
break;
}
default:
assert(false);
break;
}
// Now put it in the list of selected branches.
keptProducts_[desc.branchType()].push_back(std::make_pair(&desc, token));
}

OutputModuleBase::~OutputModuleBase() {}

SharedResourcesAcquirer OutputModuleBase::createAcquirer() {
return SharedResourcesAcquirer{
std::vector<std::shared_ptr<SerialTaskQueue>>(1, std::make_shared<SerialTaskQueue>())};
}

void OutputModuleBase::doPreallocate(PreallocationConfiguration const& iPC) {
auto nstreams = iPC.numberOfStreams();
selectors_.resize(nstreams);

preallocLumis(iPC.numberOfLuminosityBlocks());

bool seenFirst = false;
for (auto& s : selectors_) {
if (seenFirst) {
detail::configureEventSelector(selectEvents_, process_name_, getAllTriggerNames(), s, consumesCollector());
} else {
seenFirst = true;
}
}
core::OutputModuleCore::doPreallocate_(iPC);
}

void OutputModuleBase::preallocLumis(unsigned int) {}

void OutputModuleBase::doBeginJob() {
resourcesAcquirer_ = createAcquirer();
this->beginJob();
}

void OutputModuleBase::doEndJob() { endJob(); }

bool OutputModuleBase::needToRunSelection() const { return !wantAllEvents_; }

std::vector<ProductResolverIndexAndSkipBit> OutputModuleBase::productsUsedBySelection() const {
std::vector<ProductResolverIndexAndSkipBit> returnValue;
auto const& s = selectors_[0];
auto const n = s.numberOfTokens();
returnValue.reserve(n);

for (unsigned int i = 0; i < n; ++i) {
returnValue.emplace_back(uncheckedIndexFrom(s.token(i)));
}
return returnValue;
}

bool OutputModuleBase::prePrefetchSelection(StreamID id,
EventPrincipal const& ep,
ModuleCallingContext const* mcc) {
if (wantAllEvents_)
return true;
auto& s = selectors_[id.value()];
EventForOutput e(ep, moduleDescription_, mcc);
e.setConsumer(this);
return s.wantEvent(e);
core::OutputModuleCore::doBeginJob_();
}

bool OutputModuleBase::doEvent(EventTransitionInfo const& info,
ActivityRegistry* act,
ModuleCallingContext const* mcc) {
{
EventForOutput e(info, moduleDescription_, mcc);
e.setConsumer(this);
EventSignalsSentry sentry(act, mcc);
write(e);
}
{ core::OutputModuleCore::doEvent_(info, act, mcc); }
if (remainingEvents_ > 0) {
--remainingEvents_;
}
return true;
}

bool OutputModuleBase::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
RunForOutput r(info, moduleDescription_, mcc, false);
r.setConsumer(this);
doBeginRun_(r);
return true;
}

bool OutputModuleBase::doEndRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
RunForOutput r(info, moduleDescription_, mcc, true);
r.setConsumer(this);
doEndRun_(r);
return true;
}

void OutputModuleBase::doWriteProcessBlock(ProcessBlockPrincipal const& pbp, ModuleCallingContext const* mcc) {
ProcessBlockForOutput pb(pbp, moduleDescription_, mcc, true);
pb.setConsumer(this);
writeProcessBlock(pb);
}

void OutputModuleBase::doWriteRun(RunPrincipal const& rp,
ModuleCallingContext const* mcc,
MergeableRunProductMetadata const* mrpm) {
RunForOutput r(rp, moduleDescription_, mcc, true, mrpm);
r.setConsumer(this);
writeRun(r);
}

bool OutputModuleBase::doBeginLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) {
LuminosityBlockForOutput lb(info, moduleDescription_, mcc, false);
lb.setConsumer(this);
doBeginLuminosityBlock_(lb);
return true;
}

bool OutputModuleBase::doEndLuminosityBlock(LumiTransitionInfo const& info, ModuleCallingContext const* mcc) {
LuminosityBlockForOutput lb(info, moduleDescription_, mcc, true);
lb.setConsumer(this);
doEndLuminosityBlock_(lb);

return true;
}

void OutputModuleBase::doWriteLuminosityBlock(LuminosityBlockPrincipal const& lbp,
ModuleCallingContext const* mcc) {
LuminosityBlockForOutput lb(lbp, moduleDescription_, mcc, true);
lb.setConsumer(this);
writeLuminosityBlock(lb);
}

void OutputModuleBase::doOpenFile(FileBlock const& fb) { openFile(fb); }

void OutputModuleBase::doRespondToOpenInputFile(FileBlock const& fb) { doRespondToOpenInputFile_(fb); }

void OutputModuleBase::doRespondToCloseInputFile(FileBlock const& fb) { doRespondToCloseInputFile_(fb); }

void OutputModuleBase::doCloseFile() {
if (isFileOpen()) {
reallyCloseFile();
}
}

void OutputModuleBase::reallyCloseFile() {}

BranchIDLists const* OutputModuleBase::branchIDLists() {
if (!droppedBranchIDToKeptBranchID_.empty()) {
// Make a private copy of the BranchIDLists.
*branchIDLists_ = *origBranchIDLists_;
// Check for branches dropped while an EDAlias was kept.
for (BranchIDList& branchIDList : *branchIDLists_) {
for (BranchID::value_type& branchID : branchIDList) {
// Replace BranchID of each dropped branch with that of the kept alias, so the alias branch will have the product ID of the original branch.
std::map<BranchID::value_type, BranchID::value_type>::const_iterator iter =
droppedBranchIDToKeptBranchID_.find(branchID);
if (iter != droppedBranchIDToKeptBranchID_.end()) {
branchID = iter->second;
}
}
}
return branchIDLists_.get();
}
return origBranchIDLists_;
}

ThinnedAssociationsHelper const* OutputModuleBase::thinnedAssociationsHelper() const {
return thinnedAssociationsHelper_.get();
}

ModuleDescription const& OutputModuleBase::description() const { return moduleDescription_; }

bool OutputModuleBase::selected(BranchDescription const& desc) const { return productSelector_.selected(desc); }

void OutputModuleBase::fillDescriptions(ConfigurationDescriptions& descriptions) {
ParameterSetDescription desc;
desc.setUnknown();
descriptions.addDefault(desc);
}

void OutputModuleBase::fillDescription(ParameterSetDescription& desc,
std::vector<std::string> const& defaultOutputCommands) {
ProductSelectorRules::fillDescription(desc, "outputCommands", defaultOutputCommands);
EventSelector::fillDescription(desc);
}

void OutputModuleBase::prevalidate(ConfigurationDescriptions&) {}

static const std::string kBaseType("OutputModule");
const std::string& OutputModuleBase::baseType() { return kBaseType; }

void OutputModuleBase::setEventSelectionInfo(
std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
bool anyProductProduced) {
selector_config_id_ = detail::registerProperSelectionInfo(getParameterSet(selector_config_id_),
description().moduleLabel(),
outputModulePathPositions,
anyProductProduced);
}
} // namespace one
} // namespace edm
1 change: 1 addition & 0 deletions FWCore/Integration/test/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -529,5 +529,6 @@
<flags PRE_TEST="TestIntegrationProcessBlock23"/>
<flags PRE_TEST="TestIntegrationProcessBlock24"/>
</test>
<test name="TestIntegrationFinalPath" command="test_finalpath.sh"/>

</environment>
79 changes: 79 additions & 0 deletions FWCore/Integration/test/test_finalpath.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/bin/bash

LOCAL_TEST_DIR=${CMSSW_BASE}/src/FWCore/Integration/test
LOCAL_TMP_DIR=${CMSSW_BASE}/tmp/${SCRAM_ARCH}

# Pass in name and status
function die { echo $1: status $2 ; echo === Log file === ; cat ${3:-/dev/null} ; echo === End log file === ; exit $2; }

pushd ${LOCAL_TMP_DIR}

cat <<EOF > finalpath_expected_empty.log
EOF

cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py >& finalpath.log || die "failed test_finalpath_cfg.py" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py" $?


cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --schedule >& finalpath.log || die "failed test_finalpath_cfg.py --schedule" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py" $?


cat <<EOF > finalpath_expected_not_found.log
did not find thing '' TEST
did not find thing '' TEST
did not find thing '' TEST
found thing 'beginLumi' TEST
found thing 'endLumi' TEST
found thing 'beginRun' TEST
found thing 'endRun' TEST
EOF
cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --schedule --task >& finalpath.log || die "failed test_finalpath_cfg.py --schedule --task" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_not_found.log - || die "differences for test_finalpath_cfg.py --schedule --task" $?


cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath >& finalpath.log || die "failed test_finalpath_cfg.py --endpath" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py --endpath" $?

cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --schedule --endpath >& finalpath.log || die "failed test_finalpath_cfg.py --schedule --endpath" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_empty.log - || die "differences for test_finalpath_cfg.py --schedule --endpath" $?


cat <<EOF > finalpath_expected_found.log
found thing '' TEST
found thing '' TEST
found thing '' TEST
found thing 'beginLumi' TEST
found thing 'endLumi' TEST
found thing 'beginRun' TEST
found thing 'endRun' TEST
EOF
cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath --task >& finalpath.log || die "failed test_finalpath_cfg.py --endpath --task" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --endpath --task" $?

cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --endpath --task --schedule >& finalpath.log || die "failed test_finalpath_cfg.py --endpath --task --schedule" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --endpath --task --schedule" $?

cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --task >& finalpath.log || die "failed test_finalpath_cfg.py --path --task" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --path --task" $?

cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --task --schedule >& finalpath.log || die "failed test_finalpath_cfg.py --path --task --schedule" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_found.log - || die "differences for test_finalpath_cfg.py --path --task --schedule" $?


cat <<EOF > finalpath_expected_filter.log
did not find thing '' TEST
found thing '' TEST
did not find thing '' TEST
found thing 'beginLumi' TEST
found thing 'endLumi' TEST
found thing 'beginRun' TEST
found thing 'endRun' TEST
EOF

cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --filter >& finalpath.log || die "failed test_finalpath_cfg.py --path --filter" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_filter.log - || die "differences for test_finalpath_cfg.py --path --filter" $?


cmsRun ${LOCAL_TEST_DIR}/test_finalpath_cfg.py -- --path --filter --task >& finalpath.log || die "failed test_finalpath_cfg.py --path --filter --task" $?
grep "thing '.*' TEST" finalpath.log | diff finalpath_expected_filter.log - || die "differences for test_finalpath_cfg.py --path --filter --task" $?
69 changes: 69 additions & 0 deletions FWCore/Integration/test/test_finalpath_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import FWCore.ParameterSet.Config as cms
import argparse
import sys

parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test FinalPath.')

parser.add_argument("--schedule", help="use cms.Schedule", action="store_true")
parser.add_argument("--task", help="put EDProducer into a task", action="store_true")
parser.add_argument("--path", help="put a consumer of the product onto a Path", action="store_true")
parser.add_argument("--endpath", help="put a consumer of the product onto an EndPath", action="store_true")
parser.add_argument("--filter", action="store_true")
parser.add_argument("--tracer", help="add Tracer service", action="store_true")

print(sys.argv)
argv = sys.argv[:]
if '--' in argv:
argv.remove("--")
args, unknown = parser.parse_known_args(argv)


process = cms.Process("TEST")

process.MessageLogger.cerr.INFO.limit = 10000

process.source = cms.Source("EmptySource")

process.maxEvents.input = 3

process.thing = cms.EDProducer("ThingProducer")

scheduledPaths =[]
if args.path:
print("adding Path")
process.otherThing = cms.EDProducer("OtherThingProducer", thingTag = cms.InputTag("thing"))
p = cms.Path()
if args.filter:
process.fltr = cms.EDFilter("Prescaler", prescaleFactor = cms.int32(2), prescaleOffset=cms.int32(0))
p += process.fltr
if not args.task:
p += process.thing
p += process.otherThing
process.p = p
scheduledPaths.append(process.p)
if args.task:
process.p.associate(cms.Task(process.thing))

if args.endpath:
print("adding EndPath")
process.out2 = cms.OutputModule("AsciiOutputModule",outputCommands = cms.untracked.vstring("drop *", "keep *_thing_*_*"))
process.o = cms.EndPath(process.out2)
scheduledPaths.append(process.o)
if args.task:
process.o.associate(cms.Task(process.thing))

process.out = cms.OutputModule("GetProductCheckerOutputModule", verbose= cms.untracked.bool(True), outputCommands = cms.untracked.vstring("drop *", "keep *_thing_*_*"))
process.f = cms.FinalPath(process.out)

if args.schedule:
print("adding Schedule")
scheduledPaths.append(process.f)
process.schedule = cms.Schedule(*scheduledPaths)
if args.task:
process.schedule.associate(cms.Task(process.thing))

if args.tracer:
process.add_(cms.Service("Tracer"))

process.options.numberOfThreads=3
process.options.numberOfStreams=1
26 changes: 21 additions & 5 deletions FWCore/Modules/src/GetProductCheckerOutputModule.cc
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
#include "FWCore/Utilities/interface/ProductKindOfType.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"

namespace edm {
class ModuleCallingContext;
@@ -40,6 +41,7 @@ namespace edm {
void write(EventForOutput const& e) override;
void writeLuminosityBlock(LuminosityBlockForOutput const&) override;
void writeRun(RunForOutput const&) override;
const bool verbose_;
};

//
@@ -54,7 +56,9 @@ namespace edm {
// constructors and destructor
//
GetProductCheckerOutputModule::GetProductCheckerOutputModule(ParameterSet const& iPSet)
: one::OutputModuleBase(iPSet), one::OutputModule<>(iPSet) {}
: one::OutputModuleBase(iPSet),
one::OutputModule<>(iPSet),
verbose_(iPSet.getUntrackedParameter<bool>("verbose")) {}

// GetProductCheckerOutputModule::GetProductCheckerOutputModule(GetProductCheckerOutputModule const& rhs) {
// // do actual copying here;
@@ -77,12 +81,23 @@ namespace edm {
// member functions
//
template <typename T>
static void check(T const& p, std::string const& id, SelectedProducts const& iProducts) {
static void check(T const& p, std::string const& id, SelectedProducts const& iProducts, bool iVerbose) {
for (auto const& product : iProducts) {
BranchDescription const* branchDescription = product.first;
TypeID const& tid = branchDescription->unwrappedTypeID();
EDGetToken const& token = product.second;
BasicHandle bh = p.getByToken(token, tid);
if (iVerbose) {
if (bh.isValid()) {
edm::LogInfo("FoundProduct") << "found " << branchDescription->moduleLabel() << " '"
<< branchDescription->productInstanceName() << "' "
<< branchDescription->processName();
} else {
edm::LogInfo("DidNotFindProduct")
<< "did not find " << branchDescription->moduleLabel() << " '" << branchDescription->productInstanceName()
<< "' " << branchDescription->processName();
}
}
if (nullptr != bh.provenance() &&
bh.provenance()->branchDescription().branchID() != branchDescription->branchID()) {
throw cms::Exception("BranchIDMissMatch")
@@ -96,17 +111,17 @@ namespace edm {
void GetProductCheckerOutputModule::write(EventForOutput const& e) {
std::ostringstream str;
str << e.id();
check(e, str.str(), keptProducts()[InEvent]);
check(e, str.str(), keptProducts()[InEvent], verbose_);
}
void GetProductCheckerOutputModule::writeLuminosityBlock(LuminosityBlockForOutput const& l) {
std::ostringstream str;
str << l.id();
check(l, str.str(), keptProducts()[InLumi]);
check(l, str.str(), keptProducts()[InLumi], verbose_);
}
void GetProductCheckerOutputModule::writeRun(RunForOutput const& r) {
std::ostringstream str;
str << r.id();
check(r, str.str(), keptProducts()[InRun]);
check(r, str.str(), keptProducts()[InRun], verbose_);
}

//
@@ -120,6 +135,7 @@ namespace edm {
void GetProductCheckerOutputModule::fillDescriptions(ConfigurationDescriptions& descriptions) {
ParameterSetDescription desc;
one::OutputModule<>::fillDescription(desc);
desc.addUntracked<bool>("verbose", false);
descriptions.add("productChecker", desc);
}
} // namespace edm
103 changes: 99 additions & 4 deletions FWCore/ParameterSet/python/Config.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
from .Modules import _Module
from .SequenceTypes import *
from .SequenceTypes import _ModuleSequenceType, _Sequenceable #extend needs it
from .SequenceVisitors import PathValidator, EndPathValidator, ScheduleTaskValidator, NodeVisitor, CompositeVisitor, ModuleNamesFromGlobalsVisitor
from .SequenceVisitors import PathValidator, EndPathValidator, FinalPathValidator, ScheduleTaskValidator, NodeVisitor, CompositeVisitor, ModuleNamesFromGlobalsVisitor
from .MessageLogger import MessageLogger
from . import DictTypes

@@ -120,6 +120,7 @@ def __init__(self,name,*Mods):
self.__dict__['_Process__outputmodules'] = {}
self.__dict__['_Process__paths'] = DictTypes.SortedKeysDict() # have to keep the order
self.__dict__['_Process__endpaths'] = DictTypes.SortedKeysDict() # of definition
self.__dict__['_Process__finalpaths'] = DictTypes.SortedKeysDict() # of definition
self.__dict__['_Process__sequences'] = {}
self.__dict__['_Process__tasks'] = {}
self.__dict__['_Process__services'] = {}
@@ -289,6 +290,10 @@ def endpaths_(self):
"""returns a dict of the endpaths that have been added to the Process"""
return DictTypes.SortedAndFixedKeysDict(self.__endpaths)
endpaths = property(endpaths_,doc="dictionary containing the endpaths for the process")
def finalpaths_(self):
"""returns a dict of the finalpaths that have been added to the Process"""
return DictTypes.SortedAndFixedKeysDict(self.__finalpaths)
finalpaths = property(finalpaths_,doc="dictionary containing the finalpaths for the process")
def sequences_(self):
"""returns a dict of the sequences that have been added to the Process"""
return DictTypes.FixedKeysDict(self.__sequences)
@@ -477,6 +482,9 @@ def __setattr__(self,name,value):
s = self.__findFirstUsingModule(self.endpaths,oldValue)
if s is not None:
raise ValueError(msg1+"endpath "+s.label_()+msg2)
s = self.__findFirstUsingModule(self.finalpaths,oldValue)
if s is not None:
raise ValueError(msg1+"finalpath "+s.label_()+msg2)

# In case of EDAlias, raise Exception always to avoid surprises
if isinstance(newValue, EDAlias):
@@ -503,6 +511,9 @@ def __setattr__(self,name,value):
s = self.__findFirstUsingModule(self.endpaths,oldValue)
if s is not None:
raise ValueError(msg1+"endpath "+s.label_()+msg2)
s = self.__findFirstUsingModule(self.finalpaths,oldValue)
if s is not None:
raise ValueError(msg1+"finalpath "+s.label_()+msg2)

if not self.__InExtendCall and (Schedule._itemIsValid(newValue) or isinstance(newValue, Task)):
self._replaceInScheduleDirectly(name, newValue)
@@ -649,6 +660,13 @@ def _placeEndPath(self,name,mod):
except ModuleCloneError as msg:
context = format_outerframe(4)
raise Exception("%sThe module %s in endpath %s is unknown to the process %s." %(context, msg, name, self._Process__name))
def _placeFinalPath(self,name,mod):
self._validateSequence(mod, name)
try:
self._place(name, mod, self.__finalpaths)
except ModuleCloneError as msg:
context = format_outerframe(4)
raise Exception("%sThe module %s in finalpath %s is unknown to the process %s." %(context, msg, name, self._Process__name))
def _placeSequence(self,name,mod):
self._validateSequence(mod, name)
self._place(name, mod, self.__sequences)
@@ -804,6 +822,9 @@ def dumpConfig(self, options=PrintOptions()):
config+=self._dumpConfigNamedList(self.endpaths_().items(),
'endpath',
options)
config+=self._dumpConfigNamedList(self.finalpaths_().items(),
'finalpath',
options)
config+=self._dumpConfigUnnamedList(self.services_().items(),
'service',
options)
@@ -989,6 +1010,7 @@ def dumpPython(self, options=PrintOptions()):
result+=self._dumpPythonList(self._itemsInDependencyOrder(self.sequences), options)
result+=self._dumpPythonList(self.paths_(), options)
result+=self._dumpPythonList(self.endpaths_(), options)
result+=self._dumpPythonList(self.finalpaths_(), options)
result+=self._dumpPythonList(self.aliases_(), options)
if not self.schedule_() == None:
result += 'process.schedule = ' + self.schedule.dumpPython(options)
@@ -1035,6 +1057,7 @@ def splitPython(self, options = PrintOptions()):
parts.update(self._splitPythonList('sequences', self._itemsInDependencyOrder(self.sequences), options))
parts.update(self._splitPythonList('paths', self.paths_(), options))
parts.update(self._splitPythonList('paths', self.endpaths_(), options))
parts.update(self._splitPythonList('paths', self.finalpaths_(), options))
parts.update(self._splitPythonList('modules', self.aliases_(), options))

if options.targetDirectory is not None:
@@ -1083,6 +1106,8 @@ def _replaceInSequences(self, label, new):
sequenceable.replace(old,new)
for sequenceable in self.endpaths.values():
sequenceable.replace(old,new)
for sequenceable in self.finalpaths.values():
sequenceable.replace(old,new)
def _replaceInTasks(self, label, new):
old = getattr(self,label)
for task in self.tasks.values():
@@ -1148,6 +1173,7 @@ def _insertPaths(self, processPSet, nodeVisitor):
scheduledPaths = []
triggerPaths = []
endpaths = []
finalpaths = []
if self.schedule_() == None:
# make one from triggerpaths & endpaths
for name in self.paths_():
@@ -1156,19 +1182,47 @@ def _insertPaths(self, processPSet, nodeVisitor):
for name in self.endpaths_():
scheduledPaths.append(name)
endpaths.append(name)
for name in self.finalpaths_():
finalpaths.append(name)
else:
for path in self.schedule_():
pathname = path.label_()
scheduledPaths.append(pathname)
if pathname in self.endpaths_():
endpaths.append(pathname)
scheduledPaths.append(pathname)
elif pathname in self.finalpaths_():
finalpaths.append(pathname)
else:
scheduledPaths.append(pathname)
triggerPaths.append(pathname)
for task in self.schedule_()._tasks:
task.resolve(self.__dict__)
scheduleTaskValidator = ScheduleTaskValidator()
task.visit(scheduleTaskValidator)
task.visit(nodeVisitor)
# consolidate all final_paths into one EndPath
endPathWithFinalPathModulesName ="@finalPath"
finalPathEndPath = EndPath()
if finalpaths:
endpaths.append(endPathWithFinalPathModulesName)
scheduledPaths.append(endPathWithFinalPathModulesName)
finalpathValidator = FinalPathValidator()
modulesOnFinalPath = []
for finalpathname in finalpaths:
iFinalPath = self.finalpaths_()[finalpathname]
iFinalPath.resolve(self.__dict__)
finalpathValidator.setLabel(finalpathname)
iFinalPath.visit(finalpathValidator)
if finalpathValidator.filtersOnFinalpaths or finalpathValidator.producersOnFinalpaths:
names = [p.label_ for p in finalpathValidator.filtersOnFinalpaths]
names.extend( [p.label_ for p in finalpathValidator.producersOnFinalpaths])
raise RuntimeError("FinalPath %s has non OutputModules %s" % (finalpathname, ",".join(names)))
modulesOnFinalPath.extend(iFinalPath.moduleNames())
for m in modulesOnFinalPath:
mod = getattr(self, m)
setattr(mod, "@onFinalPath", untracked.bool(True))
finalPathEndPath += mod

processPSet.addVString(True, "@end_paths", endpaths)
processPSet.addVString(True, "@paths", scheduledPaths)
# trigger_paths are a little different
@@ -1190,19 +1244,25 @@ def _insertPaths(self, processPSet, nodeVisitor):
iPath.visit(pathCompositeVisitor)
iPath.insertInto(processPSet, triggername, decoratedList)
for endpathname in endpaths:
iEndPath = self.endpaths_()[endpathname]
if endpathname is not endPathWithFinalPathModulesName:
iEndPath = self.endpaths_()[endpathname]
else:
iEndPath = finalPathEndPath
iEndPath.resolve(self.__dict__)
endpathValidator.setLabel(endpathname)
lister.initialize()
iEndPath.visit(endpathCompositeVisitor)
iEndPath.insertInto(processPSet, endpathname, decoratedList)
processPSet.addVString(False, "@filters_on_endpaths", endpathValidator.filtersOnEndpaths)


def resolve(self,keepUnresolvedSequencePlaceholders=False):
for x in self.paths.values():
x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders)
for x in self.endpaths.values():
x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders)
for x in self.finalpaths.values():
x.resolve(self.__dict__,keepUnresolvedSequencePlaceholders)
if not self.schedule_() == None:
for task in self.schedule_()._tasks:
task.resolve(self.__dict__,keepUnresolvedSequencePlaceholders)
@@ -1231,6 +1291,7 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False):
schedNames = set(( x.label_() for x in self.schedule_()))
names = set(self.paths)
names.update(set(self.endpaths))
names.update(set(self.finalpaths))
unneededPaths = names - schedNames
for n in unneededPaths:
delattr(self,n)
@@ -1241,6 +1302,7 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False):
else:
pths = list(self.paths.values())
pths.extend(self.endpaths.values())
pths.extend(self.finalpaths.values())
temp = Schedule(*pths)
usedModules=set(temp.moduleNames())
unneededModules = self._pruneModules(self.producers_(), usedModules)
@@ -1256,6 +1318,9 @@ def prune(self,verbose=False,keepUnresolvedSequencePlaceholders=False):
for p in self.endpaths.values():
p.visit(sv)
p.visit(tv)
for p in self.finalpaths.values():
p.visit(sv)
p.visit(tv)
def removeUnneeded(seqOrTasks, allSequencesOrTasks):
_keepSet = set(( s for s in seqOrTasks if s.hasLabel_()))
_availableSet = set(allSequencesOrTasks.values())
@@ -1272,7 +1337,7 @@ def removeUnneeded(seqOrTasks, allSequencesOrTasks):
print(" modules:"+",".join(unneededModules))
print(" tasks:"+",".join(unneededTaskLabels))
print(" sequences:"+",".join(unneededSeqLabels))
print(" paths/endpaths:"+",".join(unneededPaths))
print(" paths/endpaths/finalpaths:"+",".join(unneededPaths))
def _pruneModules(self, d, scheduledNames):
moduleNames = set(d.keys())
junk = moduleNames - scheduledNames
@@ -2635,6 +2700,36 @@ def testPath(self):
t = Path(p.a, p.t1, Task(), p.t1)
self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.Path(process.a, cms.Task(), process.t1)\n')

def testFinalPath(self):
p = Process("test")
p.a = OutputModule("MyOutputModule")
p.b = OutputModule("YourOutputModule")
p.c = OutputModule("OurOutputModule")
path = FinalPath(p.a)
path *= p.b
path += p.c
self.assertEqual(str(path),'a+b+c')
path = FinalPath(p.a*p.b+p.c)
self.assertEqual(str(path),'a+b+c')
path = FinalPath(p.a+ p.b*p.c)
self.assertEqual(str(path),'a+b+c')
path = FinalPath(p.a*(p.b+p.c))
self.assertEqual(str(path),'a+b+c')
p.es = ESProducer("AnESProducer")
self.assertRaises(TypeError,FinalPath,p.es)

t = FinalPath()
self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.FinalPath()\n')

t = FinalPath(p.a)
self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.FinalPath(process.a)\n')

self.assertRaises(TypeError, FinalPath, Task())
self.assertRaises(TypeError, FinalPath, p.a, Task())

p.prod = EDProducer("prodName")
p.t1 = Task(p.prod)
self.assertRaises(TypeError, FinalPath, p.a, p.t1, Task(), p.t1)
def testCloneSequence(self):
p = Process("test")
a = EDAnalyzer("MyAnalyzer")
10 changes: 9 additions & 1 deletion FWCore/ParameterSet/python/SequenceTypes.py
Original file line number Diff line number Diff line change
@@ -651,6 +651,14 @@ def __init__(self,*arg,**argv):
def _placeImpl(self,name,proc):
proc._placeEndPath(name,self)

class FinalPath(_ModuleSequenceType):
def __init__(self,*arg,**argv):
super(FinalPath,self).__init__(*arg,**argv)
def _placeImpl(self,name,proc):
proc._placeFinalPath(name,self)
def associate(self,task):
raise TypeError("FinalPath does not allow associations with Tasks")

class Sequence(_ModuleSequenceType,_Sequenceable):
def __init__(self,*arg,**argv):
super(Sequence,self).__init__(*arg,**argv)
@@ -746,7 +754,7 @@ def associate(self,*tasks):
self._tasks.add(task)
@staticmethod
def _itemIsValid(item):
return isinstance(item,Path) or isinstance(item,EndPath)
return isinstance(item,Path) or isinstance(item,EndPath) or isinstance(item,FinalPath)
def copy(self):
import copy
aCopy = copy.copy(self)
30 changes: 30 additions & 0 deletions FWCore/ParameterSet/python/SequenceVisitors.py
Original file line number Diff line number Diff line change
@@ -67,6 +67,36 @@ def leave(self,visitee):
if isinstance(visitee, Task):
self._levelInTasks -= 1

# Use this on EndPaths
class FinalPathValidator(object):
def __init__(self):
self.__label = ''
self._levelInTasks = 0
self.filtersOnFinalpaths = []
self.producersOnFinalpaths = []
def setLabel(self,label):
self.__label = "'"+label+"' "
def enter(self,visitee):
if visitee.isLeaf():
if isinstance(visitee, _Labelable):
if not visitee.hasLabel_():
raise ValueError("FinalPath "+self.__label+"contains a module of type '"+visitee.type_()+"' which has\nno assigned label.")
elif isinstance(visitee, Service):
if not visitee._inProcess:
raise ValueError("FinalPath "+self.__label+"contains a service of type '"+visitee.type_()+"' which is not attached to the process.\n")
if isinstance(visitee, Task):
self._levelInTasks += 1
if self._levelInTasks > 0:
return
if isinstance(visitee,EDFilter):
self.filtersOnFinalpaths.append(visitee.type_())
if isinstance(visitee,EDProducer):
self.producersOnFinalpaths.append(visitee.type_())
def leave(self,visitee):
if self._levelInTasks > 0:
if isinstance(visitee, Task):
self._levelInTasks -= 1

class NodeVisitor(object):
"""Form sets of all modules, ESProducers, ESSources and Services in visited objects. Can be used
to visit Paths, EndPaths, Sequences or Tasks. Includes in sets objects on sub-Sequences and sub-Tasks"""

0 comments on commit ea5c1fc

Please sign in to comment.