Skip to content

Commit

Permalink
Merge pull request #34231 from wddgit/modifyOptions
Browse files Browse the repository at this point in the history
Enable concurrent lumis and IOVs by default when number of streams is at least 2
  • Loading branch information
cmsbuild authored Jul 3, 2021
2 parents 3a2a90d + 8a8e0a8 commit 3dba761
Show file tree
Hide file tree
Showing 21 changed files with 223 additions and 59 deletions.
58 changes: 36 additions & 22 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,20 +209,14 @@ namespace edm {

std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
eventsetup::EventSetupProvider& cp,
ParameterSet& params) {
ParameterSet& params,
std::vector<std::string> const& loopers) {
std::shared_ptr<EDLooperBase> vLooper;

std::vector<std::string> loopers = params.getParameter<std::vector<std::string>>("@all_loopers");

if (loopers.empty()) {
return vLooper;
}

assert(1 == loopers.size());

for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
++itName) {
ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
for (auto const& looperName : loopers) {
ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
validateLooper(*providerPSet);
providerPSet->registerIt();
vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
Expand Down Expand Up @@ -386,9 +380,6 @@ namespace edm {
if (nStreams == 0) {
nStreams = nThreads;
}
if (nThreads > 1 or nStreams > 1) {
edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
}
unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
if (nConcurrentRuns != 1) {
throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
Expand All @@ -397,8 +388,35 @@ namespace edm {
unsigned int nConcurrentLumis =
optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
if (nConcurrentLumis == 0) {
nConcurrentLumis = nConcurrentRuns;
nConcurrentLumis = 2;
}
if (nConcurrentLumis > nStreams) {
nConcurrentLumis = nStreams;
}
std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
if (!loopers.empty()) {
//For now loopers make us run only 1 transition at a time
if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
"of concurrent runs, and the number of concurrent lumis "
"are all being reset to 1. Loopers cannot currently support "
"values greater than 1.";
nStreams = 1;
nConcurrentLumis = 1;
nConcurrentRuns = 1;
}
}
bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
if (dumpOptions) {
dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
} else {
if (nThreads > 1 or nStreams > 1) {
edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
}
}
// The number of concurrent IOVs is configured individually for each record in
// the class NumberOfConcurrentIOVs to values less than or equal to this.
unsigned int maxConcurrentIOVs = nConcurrentLumis;

//Check that relationships between threading parameters makes sense
/*
Expand Down Expand Up @@ -439,22 +457,18 @@ namespace edm {

// intialize the event setup provider
ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
esp_ = espController_->makeProvider(
*parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);

// initialize the looper, if any
looper_ = fillLooper(*espController_, *esp_, *parameterSet);
if (looper_) {
if (!loopers.empty()) {
looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
looper_->setActionTable(items.act_table_.get());
looper_->attachTo(*items.actReg_);

//For now loopers make us run only 1 transition at a time
nStreams = 1;
nConcurrentLumis = 1;
nConcurrentRuns = 1;
// in presence of looper do not delete modules
deleteNonConsumedUnscheduledModules_ = false;
}
espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);

preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};

Expand Down
10 changes: 4 additions & 6 deletions FWCore/Framework/src/EventSetupsController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ namespace edm {

std::shared_ptr<EventSetupProvider> EventSetupsController::makeProvider(ParameterSet& iPSet,
ActivityRegistry* activityRegistry,
ParameterSet const* eventSetupPset) {
ParameterSet const* eventSetupPset,
unsigned int maxConcurrentIOVs,
bool dumpOptions) {
// Makes an EventSetupProvider
// Also parses the prefer information from ParameterSets and puts
// it in a map that is stored in the EventSetupProvider
Expand All @@ -53,16 +55,12 @@ namespace edm {
// EventSetupsController and in the EventSetupProvider
fillEventSetupProvider(*this, *returnValue, iPSet);

numberOfConcurrentIOVs_.readConfigurationParameters(eventSetupPset);
numberOfConcurrentIOVs_.readConfigurationParameters(eventSetupPset, maxConcurrentIOVs, dumpOptions);

providers_.push_back(returnValue);
return returnValue;
}

void EventSetupsController::setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis) {
numberOfConcurrentIOVs_.setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
}

void EventSetupsController::finishConfiguration() {
if (mustFinishConfiguration_) {
for (auto& eventSetupProvider : providers_) {
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/src/EventSetupsController.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ namespace edm {

std::shared_ptr<EventSetupProvider> makeProvider(ParameterSet&,
ActivityRegistry*,
ParameterSet const* eventSetupPset = nullptr);

void setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis);
ParameterSet const* eventSetupPset = nullptr,
unsigned int maxConcurrentIOVs = 0,
bool dumpOptions = false);

// Pass in an IOVSyncValue to let the EventSetup system know which run and lumi
// need to be processed and prepare IOVs for it (also could be a time or only a run).
Expand Down
18 changes: 10 additions & 8 deletions FWCore/Framework/src/NumberOfConcurrentIOVs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ namespace edm {

NumberOfConcurrentIOVs::NumberOfConcurrentIOVs() : numberConcurrentIOVs_(1) {}

void NumberOfConcurrentIOVs::readConfigurationParameters(ParameterSet const* eventSetupPset) {
void NumberOfConcurrentIOVs::readConfigurationParameters(ParameterSet const* eventSetupPset,
unsigned int maxConcurrentIOVs,
bool dumpOptions) {
if (eventSetupPset) { // this condition is false for SubProcesses
maxConcurrentIOVs_ = maxConcurrentIOVs;
numberConcurrentIOVs_ = eventSetupPset->getUntrackedParameter<unsigned int>("numberOfConcurrentIOVs");
if (numberConcurrentIOVs_ == 0) {
numberConcurrentIOVs_ = 1;
if (numberConcurrentIOVs_ == 0 || numberConcurrentIOVs_ > maxConcurrentIOVs) {
numberConcurrentIOVs_ = maxConcurrentIOVs;
}
if (dumpOptions) {
LogAbsolute("Options") << "Number of Concurrent IOVs = " << numberConcurrentIOVs_;
}

ParameterSet const& pset(eventSetupPset->getUntrackedParameterSet("forceNumberOfConcurrentIOVs"));
Expand All @@ -36,10 +42,6 @@ namespace edm {
}
}

void NumberOfConcurrentIOVs::setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis) {
maxConcurrentIOVs_ = std::min(nStreams, nConcurrentLumis);
}

void NumberOfConcurrentIOVs::fillRecordsNotAllowingConcurrentIOVs(EventSetupProvider const& eventSetupProvider) {
eventSetupProvider.fillRecordsNotAllowingConcurrentIOVs(recordsNotAllowingConcurrentIOVs_);
}
Expand Down Expand Up @@ -69,7 +71,7 @@ namespace edm {
<< "But you cannot have more concurrent IOVs than lumis or streams.\n"
<< "There will not be more than " << maxConcurrentIOVs_ << " concurrent IOVs.\n";
}
return std::min(numberConcurrentIOVs_, maxConcurrentIOVs_);
return numberConcurrentIOVs_;
}

void NumberOfConcurrentIOVs::clear() {
Expand Down
7 changes: 3 additions & 4 deletions FWCore/Framework/src/NumberOfConcurrentIOVs.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ namespace edm {
public:
NumberOfConcurrentIOVs();

void readConfigurationParameters(ParameterSet const* eventSetupPset);

// Can't have more concurrent IOVs than streams or concurrent lumis
void setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis);
void readConfigurationParameters(ParameterSet const* eventSetupPset,
unsigned int maxConcurrentIOVs,
bool dumpOptions);

// This depends on bool's hard coded in the EventSetupRecord C++ classes
void fillRecordsNotAllowingConcurrentIOVs(EventSetupProvider const&);
Expand Down
2 changes: 2 additions & 0 deletions FWCore/Framework/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
<use name="FWCore/Utilities"/>
</bin>

<test name="TestFWCoreFrameworkOptions" command="run_testOptions.sh ${value}" for="0,4"/>

<library file="stubs/TestTriggerNames.cc" name="TestTriggerNames">
<flags EDM_PLUGIN="1"/>
<use name="DataFormats/Common"/>
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/test/run_concurrent_lumis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ touch empty_file

(cmsRun ${LOCAL_TEST_DIR}/test_2_concurrent_lumis_cfg.py 2>&1) | tail -n 1 | grep -v ' 0 ' | grep -v 'e-' | diff - empty_file && die "Failure using test_2_concurrent_lumis_cfg.py" $?

exit 0
exit 0
30 changes: 30 additions & 0 deletions FWCore/Framework/test/run_testOptions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

# Capable of running 5 tests, this bash script expects a command line
# argument from 0 to 4 specifying which test to run

LOCAL_TEST_DIR=${CMSSW_BASE}/src/FWCore/Framework/test
LOCAL_TMP_DIR=${CMSSW_BASE}/tmp/${SCRAM_ARCH}

function die { echo Failure $1: status $2 ; exit $2 ; }

pushd ${LOCAL_TMP_DIR}

echo "Running run_testOptions.sh $1"

# Configuration files and expected outputs for the 5 tests
configFiles=("testOptions0_cfg.py" "testOptions1_cfg.py" "testOptions2_cfg.py" "testOptions3_cfg.py" "testOptions4_cfg.py")
expectedStreams=(1 4 4 4 1)
expectedConcurrentLumis=(1 3 2 4 1)
expectedConcurrentIOVs=(1 2 2 4 1)

cmsRun -p ${LOCAL_TEST_DIR}/${configFiles[$1]} >& ${configFiles[$1]}.log || die "cmsRun ${configFiles[$1]}" $?
grep "Number of Streams = ${expectedStreams[$1]}" ${configFiles[$1]}.log || die "Failed number of streams test" $?
grep "Number of Concurrent Lumis = ${expectedConcurrentLumis[$1]}" ${configFiles[$1]}.log || die "Failed number of concurrent lumis test" $?
grep "Number of Concurrent IOVs = ${expectedConcurrentIOVs[$1]}" ${configFiles[$1]}.log || die "Failed number of concurrent IOVs test" $?

rm ${configFiles[$1]}.log

popd

exit 0
8 changes: 8 additions & 0 deletions FWCore/Framework/test/testOptions0_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Test of options when all parameters taken from Config.py
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)
process.options.dumpOptions = True
17 changes: 17 additions & 0 deletions FWCore/Framework/test/testOptions1_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Test of options when all parameters explicitly set
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)

process.options = dict(
dumpOptions = True,
numberOfThreads = 4,
numberOfStreams = 4,
numberOfConcurrentLuminosityBlocks = 3,
eventSetup = dict(
numberOfConcurrentIOVs = 2
)
)
17 changes: 17 additions & 0 deletions FWCore/Framework/test/testOptions2_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Test of options when parameters are zero
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)

process.options = dict(
dumpOptions = True,
numberOfThreads = 4,
numberOfStreams = 0,
numberOfConcurrentLuminosityBlocks = 0,
eventSetup = dict(
numberOfConcurrentIOVs = 0
)
)
17 changes: 17 additions & 0 deletions FWCore/Framework/test/testOptions3_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Test of options when concurrentIOVs and concurrentLumis are too big
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)

process.options = dict(
dumpOptions = True,
numberOfThreads = 4,
numberOfStreams = 4,
numberOfConcurrentLuminosityBlocks = 5,
eventSetup = dict(
numberOfConcurrentIOVs = 5
)
)
21 changes: 21 additions & 0 deletions FWCore/Framework/test/testOptions4_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Test of options when a looper is in the job
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)

process.options = dict(
dumpOptions = True,
numberOfThreads = 4,
numberOfStreams = 4,
numberOfConcurrentLuminosityBlocks = 7,
eventSetup = dict(
numberOfConcurrentIOVs = 7
)
)

process.looper = cms.Looper("DummyLooper",
value = cms.untracked.int32(4)
)
3 changes: 2 additions & 1 deletion FWCore/Framework/test/test_global_modules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

process.options = cms.untracked.PSet(
numberOfStreams = cms.untracked.uint32(nStreams),
numberOfThreads = cms.untracked.uint32(nStreams)
numberOfThreads = cms.untracked.uint32(nStreams),
numberOfConcurrentLuminosityBlocks = cms.untracked.uint32(1)
)

process.maxEvents = cms.untracked.PSet(
Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/test/test_limited_modules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

process.options = cms.untracked.PSet(
numberOfStreams = cms.untracked.uint32(nStreams),
numberOfThreads = cms.untracked.uint32(nStreams)
numberOfThreads = cms.untracked.uint32(nStreams),
numberOfConcurrentLuminosityBlocks = cms.untracked.uint32(1)
)


Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/test/test_stream_modules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import FWCore.Framework.test.cmsExceptionsFatalOption_cff
process.options = cms.untracked.PSet(
numberOfStreams = cms.untracked.uint32(nStreams),
numberOfThreads = cms.untracked.uint32(nStreams)
numberOfThreads = cms.untracked.uint32(nStreams),
numberOfConcurrentLuminosityBlocks = cms.untracked.uint32(1)
)


Expand Down
4 changes: 4 additions & 0 deletions FWCore/ParameterSet/interface/validateTopLevelParameterSets.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,9 @@ namespace edm {
void fillMaxEventsDescription(ParameterSetDescription& description);
void fillMaxLuminosityBlocksDescription(ParameterSetDescription& description);
void fillMaxSecondsUntilRampdownDescription(ParameterSetDescription& description);
void dumpOptionsToLogFile(unsigned int nThreads,
unsigned int nStreams,
unsigned int nConcurrentLumis,
unsigned int nConcurrentRuns);
} // namespace edm
#endif
Loading

0 comments on commit 3dba761

Please sign in to comment.