Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable concurrent lumis and IOVs by default when number of streams is at least 2 #34231

Merged
merged 7 commits into from
Jul 3, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,20 +209,14 @@ namespace edm {

std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController,
eventsetup::EventSetupProvider& cp,
ParameterSet& params) {
ParameterSet& params,
std::vector<std::string> const& loopers) {
std::shared_ptr<EDLooperBase> vLooper;

std::vector<std::string> loopers = params.getParameter<std::vector<std::string>>("@all_loopers");

if (loopers.empty()) {
return vLooper;
}

assert(1 == loopers.size());

for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd;
++itName) {
ParameterSet* providerPSet = params.getPSetForUpdate(*itName);
for (auto const& looperName : loopers) {
ParameterSet* providerPSet = params.getPSetForUpdate(looperName);
validateLooper(*providerPSet);
providerPSet->registerIt();
vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet);
Expand Down Expand Up @@ -386,9 +380,6 @@ namespace edm {
if (nStreams == 0) {
nStreams = nThreads;
}
if (nThreads > 1 or nStreams > 1) {
edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
}
unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns");
if (nConcurrentRuns != 1) {
throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ")
Expand All @@ -397,8 +388,35 @@ namespace edm {
unsigned int nConcurrentLumis =
optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks");
if (nConcurrentLumis == 0) {
nConcurrentLumis = nConcurrentRuns;
nConcurrentLumis = 2;
}
if (nConcurrentLumis > nStreams) {
nConcurrentLumis = nStreams;
}
std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers");
if (!loopers.empty()) {
//For now loopers make us run only 1 transition at a time
if (nStreams != 1 || nConcurrentLumis != 1 || nConcurrentRuns != 1) {
edm::LogWarning("ThreadStreamSetup") << "There is a looper, so the number of streams, the number "
"of concurrent runs, and the number of concurrent lumis "
"are all being reset to 1. Loopers cannot currently support "
"values greater than 1.";
nStreams = 1;
nConcurrentLumis = 1;
nConcurrentRuns = 1;
}
}
bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions");
if (dumpOptions) {
dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns);
} else {
if (nThreads > 1 or nStreams > 1) {
edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams;
}
}
// The number of concurrent IOVs is configured individually for each record in
// the class NumberOfConcurrentIOVs to values less than or equal to this.
unsigned int maxConcurrentIOVs = nConcurrentLumis;

//Check that relationships between threading parameters makes sense
/*
Expand Down Expand Up @@ -439,22 +457,18 @@ namespace edm {

// intialize the event setup provider
ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup"));
esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset);
esp_ = espController_->makeProvider(
*parameterSet, items.actReg_.get(), &eventSetupPset, maxConcurrentIOVs, dumpOptions);

// initialize the looper, if any
looper_ = fillLooper(*espController_, *esp_, *parameterSet);
if (looper_) {
if (!loopers.empty()) {
looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers);
looper_->setActionTable(items.act_table_.get());
looper_->attachTo(*items.actReg_);

//For now loopers make us run only 1 transition at a time
nStreams = 1;
nConcurrentLumis = 1;
nConcurrentRuns = 1;
// in presence of looper do not delete modules
deleteNonConsumedUnscheduledModules_ = false;
}
espController_->setMaxConcurrentIOVs(nStreams, nConcurrentLumis);

preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns};

Expand Down
10 changes: 4 additions & 6 deletions FWCore/Framework/src/EventSetupsController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ namespace edm {

std::shared_ptr<EventSetupProvider> EventSetupsController::makeProvider(ParameterSet& iPSet,
ActivityRegistry* activityRegistry,
ParameterSet const* eventSetupPset) {
ParameterSet const* eventSetupPset,
unsigned int maxConcurrentIOVs,
bool dumpOptions) {
// Makes an EventSetupProvider
// Also parses the prefer information from ParameterSets and puts
// it in a map that is stored in the EventSetupProvider
Expand All @@ -53,16 +55,12 @@ namespace edm {
// EventSetupsController and in the EventSetupProvider
fillEventSetupProvider(*this, *returnValue, iPSet);

numberOfConcurrentIOVs_.readConfigurationParameters(eventSetupPset);
numberOfConcurrentIOVs_.readConfigurationParameters(eventSetupPset, maxConcurrentIOVs, dumpOptions);

providers_.push_back(returnValue);
return returnValue;
}

void EventSetupsController::setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis) {
numberOfConcurrentIOVs_.setMaxConcurrentIOVs(nStreams, nConcurrentLumis);
}

void EventSetupsController::finishConfiguration() {
if (mustFinishConfiguration_) {
for (auto& eventSetupProvider : providers_) {
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/src/EventSetupsController.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ namespace edm {

std::shared_ptr<EventSetupProvider> makeProvider(ParameterSet&,
ActivityRegistry*,
ParameterSet const* eventSetupPset = nullptr);

void setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis);
ParameterSet const* eventSetupPset = nullptr,
unsigned int maxConcurrentIOVs = 0,
bool dumpOptions = false);

// Pass in an IOVSyncValue to let the EventSetup system know which run and lumi
// need to be processed and prepare IOVs for it (also could be a time or only a run).
Expand Down
18 changes: 10 additions & 8 deletions FWCore/Framework/src/NumberOfConcurrentIOVs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ namespace edm {

NumberOfConcurrentIOVs::NumberOfConcurrentIOVs() : numberConcurrentIOVs_(1) {}

void NumberOfConcurrentIOVs::readConfigurationParameters(ParameterSet const* eventSetupPset) {
void NumberOfConcurrentIOVs::readConfigurationParameters(ParameterSet const* eventSetupPset,
unsigned int maxConcurrentIOVs,
bool dumpOptions) {
if (eventSetupPset) { // this condition is false for SubProcesses
maxConcurrentIOVs_ = maxConcurrentIOVs;
numberConcurrentIOVs_ = eventSetupPset->getUntrackedParameter<unsigned int>("numberOfConcurrentIOVs");
if (numberConcurrentIOVs_ == 0) {
numberConcurrentIOVs_ = 1;
if (numberConcurrentIOVs_ == 0 || numberConcurrentIOVs_ > maxConcurrentIOVs) {
numberConcurrentIOVs_ = maxConcurrentIOVs;
}
if (dumpOptions) {
LogAbsolute("Options") << "Number of Concurrent IOVs = " << numberConcurrentIOVs_;
}

ParameterSet const& pset(eventSetupPset->getUntrackedParameterSet("forceNumberOfConcurrentIOVs"));
Expand All @@ -36,10 +42,6 @@ namespace edm {
}
}

void NumberOfConcurrentIOVs::setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis) {
maxConcurrentIOVs_ = std::min(nStreams, nConcurrentLumis);
}

void NumberOfConcurrentIOVs::fillRecordsNotAllowingConcurrentIOVs(EventSetupProvider const& eventSetupProvider) {
eventSetupProvider.fillRecordsNotAllowingConcurrentIOVs(recordsNotAllowingConcurrentIOVs_);
}
Expand Down Expand Up @@ -69,7 +71,7 @@ namespace edm {
<< "But you cannot have more concurrent IOVs than lumis or streams.\n"
<< "There will not be more than " << maxConcurrentIOVs_ << " concurrent IOVs.\n";
}
return std::min(numberConcurrentIOVs_, maxConcurrentIOVs_);
return numberConcurrentIOVs_;
}

void NumberOfConcurrentIOVs::clear() {
Expand Down
7 changes: 3 additions & 4 deletions FWCore/Framework/src/NumberOfConcurrentIOVs.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ namespace edm {
public:
NumberOfConcurrentIOVs();

void readConfigurationParameters(ParameterSet const* eventSetupPset);

// Can't have more concurrent IOVs than streams or concurrent lumis
void setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis);
void readConfigurationParameters(ParameterSet const* eventSetupPset,
unsigned int maxConcurrentIOVs,
bool dumpOptions);

// This depends on bool's hard coded in the EventSetupRecord C++ classes
void fillRecordsNotAllowingConcurrentIOVs(EventSetupProvider const&);
Expand Down
6 changes: 6 additions & 0 deletions FWCore/Framework/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
<use name="FWCore/Utilities"/>
</bin>

<test name="TestFWCoreFrameworkOptions0" command="run_testOptions.sh 0"/>
<test name="TestFWCoreFrameworkOptions1" command="run_testOptions.sh 1"/>
<test name="TestFWCoreFrameworkOptions2" command="run_testOptions.sh 2"/>
<test name="TestFWCoreFrameworkOptions3" command="run_testOptions.sh 3"/>
<test name="TestFWCoreFrameworkOptions4" command="run_testOptions.sh 4"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wddgit , these are good candidates of recently added feature of scram V3 (which I will present in one of core sw meetings). You can use either

<test name="TestFWCoreFrameworkOptions" command="run_testOptions.sh ${value}" for="0,4"/>

or

<test name="TestFWCoreFrameworkOptions" command="run_testOptions.sh ${value}" foreach="0,1,2,3,4"/>

and scram will define TestFWCoreFrameworkOptions_N tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed a commit using the new for loop in the BuildFile. It seems to work well. Thanks.


<library file="stubs/TestTriggerNames.cc" name="TestTriggerNames">
<flags EDM_PLUGIN="1"/>
<use name="DataFormats/Common"/>
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/test/run_concurrent_lumis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ touch empty_file

(cmsRun ${LOCAL_TEST_DIR}/test_2_concurrent_lumis_cfg.py 2>&1) | tail -n 1 | grep -v ' 0 ' | grep -v 'e-' | diff - empty_file && die "Failure using test_2_concurrent_lumis_cfg.py" $?

exit 0
exit 0
30 changes: 30 additions & 0 deletions FWCore/Framework/test/run_testOptions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

# Capable of running 5 tests, this bash script expects a command line
# argument from 0 to 4 specifying which test to run

LOCAL_TEST_DIR=${CMSSW_BASE}/src/FWCore/Framework/test
LOCAL_TMP_DIR=${CMSSW_BASE}/tmp/${SCRAM_ARCH}
Copy link
Contributor

@makortel makortel Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smuzaffar Could you double-check that these are fine? In some other tests using the <test> directive we've used $LOCALTOP instead of $CMSSW_BASE, and just run in the current working directory instead of pushd/cd to elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some level this is a management policy decision. Where do we want files generated while running tests to go? I always thought that was the purpose of the tmp/slc7_amd64_gcc900 directory. I thought the tests that left files at the top level of the working directory were misbehaving (there are several that still do). I always fixed that when I touched them for some other purpose. That is what the pushd and popd commands were for.

This is similar to the question that came up in the other PR. What should we do with log files and little data files generated by the tests? Delete them if the shell script succeeds? Leave them always just in case someone wants to look at them? Is it a judgement call to be made on a case by case basis?

Copy link
Contributor

@smuzaffar smuzaffar Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOCALTOP is available only when scram build is run while CMSSW_BASE is available in the env after cmsenv. So if you want to run your script directly via command line then better to use CMSSW_BASE.

For temp logs and output files , it is better to create them under ${CMSSW_BASE}/tmp/${SCRAM_ARCH} and even better to create a sub-directory per test there. This also helps with cleanup. At one point I wanted to automate this but few tests failed as they depend on other tests output or they assume that they are run from top level $CMSSW_BASE directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @smuzaffar. I take it that we could look into changing the scripts that use $LOCALTOP to put their outputs into ${CMSSW_BASE}/tmp/${SCRAM_ARCH}.


function die { echo Failure $1: status $2 ; exit $2 ; }

pushd ${LOCAL_TMP_DIR}

echo "Running run_testOptions.sh $1"

# Configuration files and expected outputs for the 5 tests
configFiles=("testOptions0_cfg.py" "testOptions1_cfg.py" "testOptions2_cfg.py" "testOptions3_cfg.py" "testOptions4_cfg.py")
expectedStreams=(1 4 4 4 1)
expectedConcurrentLumis=(1 3 2 4 1)
expectedConcurrentIOVs=(1 2 2 4 1)

cmsRun -p ${LOCAL_TEST_DIR}/${configFiles[$1]} >& ${configFiles[$1]}.log || die "cmsRun ${configFiles[$1]}" $?
grep "Number of Streams = ${expectedStreams[$1]}" ${configFiles[$1]}.log || die "Failed number of streams test" $?
grep "Number of Concurrent Lumis = ${expectedConcurrentLumis[$1]}" ${configFiles[$1]}.log || die "Failed number of concurrent lumis test" $?
grep "Number of Concurrent IOVs = ${expectedConcurrentIOVs[$1]}" ${configFiles[$1]}.log || die "Failed number of concurrent IOVs test" $?

rm ${configFiles[$1]}.log

popd

exit 0
8 changes: 8 additions & 0 deletions FWCore/Framework/test/testOptions0_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Test of options when all parameters taken from Config.py
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)
process.options.dumpOptions = True
17 changes: 17 additions & 0 deletions FWCore/Framework/test/testOptions1_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Test of options when all parameters explicitly set
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)

process.options = dict(
dumpOptions = True,
numberOfThreads = 4,
numberOfStreams = 4,
numberOfConcurrentLuminosityBlocks = 3,
eventSetup = dict(
numberOfConcurrentIOVs = 2
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to do process.options.dumpOptions = True

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. It works. It is a little more concise. Thanks.

17 changes: 17 additions & 0 deletions FWCore/Framework/test/testOptions2_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Test of options when parameters are zero
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)

process.options = dict(
dumpOptions = True,
numberOfThreads = 4,
numberOfStreams = 0,
numberOfConcurrentLuminosityBlocks = 0,
eventSetup = dict(
numberOfConcurrentIOVs = 0
)
)
17 changes: 17 additions & 0 deletions FWCore/Framework/test/testOptions3_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Test of options when concurrentIOVs and concurrentLumis are too big
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)

process.options = dict(
dumpOptions = True,
numberOfThreads = 4,
numberOfStreams = 4,
numberOfConcurrentLuminosityBlocks = 5,
eventSetup = dict(
numberOfConcurrentIOVs = 5
)
)
21 changes: 21 additions & 0 deletions FWCore/Framework/test/testOptions4_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Test of options when a looper is in the job
import FWCore.ParameterSet.Config as cms
process = cms.Process("TEST")
process.source = cms.Source("EmptySource")
process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(1)
)

process.options = dict(
dumpOptions = True,
numberOfThreads = 4,
numberOfStreams = 4,
numberOfConcurrentLuminosityBlocks = 7,
eventSetup = dict(
numberOfConcurrentIOVs = 7
)
)

process.looper = cms.Looper("DummyLooper",
value = cms.untracked.int32(4)
)
3 changes: 2 additions & 1 deletion FWCore/Framework/test/test_global_modules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

process.options = cms.untracked.PSet(
numberOfStreams = cms.untracked.uint32(nStreams),
numberOfThreads = cms.untracked.uint32(nStreams)
numberOfThreads = cms.untracked.uint32(nStreams),
numberOfConcurrentLuminosityBlocks = cms.untracked.uint32(1)
)

process.maxEvents = cms.untracked.PSet(
Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/test/test_limited_modules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

process.options = cms.untracked.PSet(
numberOfStreams = cms.untracked.uint32(nStreams),
numberOfThreads = cms.untracked.uint32(nStreams)
numberOfThreads = cms.untracked.uint32(nStreams),
numberOfConcurrentLuminosityBlocks = cms.untracked.uint32(1)
)


Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/test/test_stream_modules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import FWCore.Framework.test.cmsExceptionsFatalOption_cff
process.options = cms.untracked.PSet(
numberOfStreams = cms.untracked.uint32(nStreams),
numberOfThreads = cms.untracked.uint32(nStreams)
numberOfThreads = cms.untracked.uint32(nStreams),
numberOfConcurrentLuminosityBlocks = cms.untracked.uint32(1)
)


Expand Down
4 changes: 4 additions & 0 deletions FWCore/ParameterSet/interface/validateTopLevelParameterSets.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,9 @@ namespace edm {
void fillMaxEventsDescription(ParameterSetDescription& description);
void fillMaxLuminosityBlocksDescription(ParameterSetDescription& description);
void fillMaxSecondsUntilRampdownDescription(ParameterSetDescription& description);
void dumpOptionsToLogFile(unsigned int nThreads,
unsigned int nStreams,
unsigned int nConcurrentLumis,
unsigned int nConcurrentRuns);
} // namespace edm
#endif
Loading