-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable concurrent lumis and IOVs by default when number of streams is at least 2 #34231
Changes from 1 commit
d844110
d4b083e
5baf5c3
804766c
72d1599
755296a
8a8e0a8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -209,20 +209,14 @@ namespace edm { | |
|
||
std::shared_ptr<EDLooperBase> fillLooper(eventsetup::EventSetupsController& esController, | ||
eventsetup::EventSetupProvider& cp, | ||
ParameterSet& params) { | ||
ParameterSet& params, | ||
std::vector<std::string> const& loopers) { | ||
std::shared_ptr<EDLooperBase> vLooper; | ||
|
||
std::vector<std::string> loopers = params.getParameter<std::vector<std::string>>("@all_loopers"); | ||
|
||
if (loopers.empty()) { | ||
return vLooper; | ||
} | ||
|
||
assert(1 == loopers.size()); | ||
|
||
for (std::vector<std::string>::iterator itName = loopers.begin(), itNameEnd = loopers.end(); itName != itNameEnd; | ||
++itName) { | ||
ParameterSet* providerPSet = params.getPSetForUpdate(*itName); | ||
for (auto const& looperName : loopers) { | ||
ParameterSet* providerPSet = params.getPSetForUpdate(looperName); | ||
validateLooper(*providerPSet); | ||
providerPSet->registerIt(); | ||
vLooper = eventsetup::LooperFactory::get()->addTo(esController, cp, *providerPSet); | ||
|
@@ -386,9 +380,6 @@ namespace edm { | |
if (nStreams == 0) { | ||
nStreams = nThreads; | ||
} | ||
if (nThreads > 1 or nStreams > 1) { | ||
edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams; | ||
} | ||
unsigned int nConcurrentRuns = optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentRuns"); | ||
if (nConcurrentRuns != 1) { | ||
throw Exception(errors::Configuration, "Illegal value nConcurrentRuns : ") | ||
|
@@ -397,7 +388,24 @@ namespace edm { | |
unsigned int nConcurrentLumis = | ||
optionsPset.getUntrackedParameter<unsigned int>("numberOfConcurrentLuminosityBlocks"); | ||
if (nConcurrentLumis == 0) { | ||
nConcurrentLumis = nConcurrentRuns; | ||
nConcurrentLumis = 2; | ||
} | ||
if (nConcurrentLumis > nStreams) { | ||
nConcurrentLumis = nStreams; | ||
} | ||
std::vector<std::string> loopers = parameterSet->getParameter<std::vector<std::string>>("@all_loopers"); | ||
if (!loopers.empty()) { | ||
//For now loopers make us run only 1 transition at a time | ||
nStreams = 1; | ||
nConcurrentLumis = 1; | ||
nConcurrentRuns = 1; | ||
} | ||
if (nThreads > 1 or nStreams > 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we move this to be within an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. This seems better. We don't need the LogInfo stuff in the unit test output. I viewed the dumpOptions as something for Framework unit tests. I am wondering if it could get used for other purposes... I tried to leave the other output unmodified, just in case there is something that uses it and depends on it being in the log file. (I'll push all the commits with changes when I get them all done.) |
||
edm::LogInfo("ThreadStreamSetup") << "setting # threads " << nThreads << "\nsetting # streams " << nStreams; | ||
} | ||
bool dumpOptions = optionsPset.getUntrackedParameter<bool>("dumpOptions"); | ||
if (dumpOptions) { | ||
dumpOptionsToLogFile(nThreads, nStreams, nConcurrentLumis, nConcurrentRuns); | ||
} | ||
|
||
//Check that relationships between threading parameters makes sense | ||
|
@@ -439,18 +447,15 @@ namespace edm { | |
|
||
// intialize the event setup provider | ||
ParameterSet const& eventSetupPset(optionsPset.getUntrackedParameterSet("eventSetup")); | ||
esp_ = espController_->makeProvider(*parameterSet, items.actReg_.get(), &eventSetupPset); | ||
esp_ = espController_->makeProvider( | ||
*parameterSet, items.actReg_.get(), &eventSetupPset, nConcurrentLumis, dumpOptions); | ||
|
||
// initialize the looper, if any | ||
looper_ = fillLooper(*espController_, *esp_, *parameterSet); | ||
if (looper_) { | ||
if (!loopers.empty()) { | ||
looper_ = fillLooper(*espController_, *esp_, *parameterSet, loopers); | ||
looper_->setActionTable(items.act_table_.get()); | ||
looper_->attachTo(*items.actReg_); | ||
|
||
//For now loopers make us run only 1 transition at a time | ||
nStreams = 1; | ||
nConcurrentLumis = 1; | ||
nConcurrentRuns = 1; | ||
// in presence of looper do not delete modules | ||
deleteNonConsumedUnscheduledModules_ = false; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,9 @@ namespace edm { | |
|
||
std::shared_ptr<EventSetupProvider> EventSetupsController::makeProvider(ParameterSet& iPSet, | ||
ActivityRegistry* activityRegistry, | ||
ParameterSet const* eventSetupPset) { | ||
ParameterSet const* eventSetupPset, | ||
unsigned int nConcurrentLumis, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we change this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made some improvements here. The behavior is exactly the same, but hopefully it is more readable and understandable. Below are the changes in EventProcessor.cc with appropriate changes also in the lower level code (I'll push all these commits as soon as I finish all of them). I noticed we no longer need the separate function setMaxConcurrentIOVs since I moved the code that resets things if there is a looper.
|
||
bool dumpOptions) { | ||
// Makes an EventSetupProvider | ||
// Also parses the prefer information from ParameterSets and puts | ||
// it in a map that is stored in the EventSetupProvider | ||
|
@@ -53,7 +55,7 @@ namespace edm { | |
// EventSetupsController and in the EventSetupProvider | ||
fillEventSetupProvider(*this, *returnValue, iPSet); | ||
|
||
numberOfConcurrentIOVs_.readConfigurationParameters(eventSetupPset); | ||
numberOfConcurrentIOVs_.readConfigurationParameters(eventSetupPset, nConcurrentLumis, dumpOptions); | ||
|
||
providers_.push_back(returnValue); | ||
return returnValue; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,7 +88,9 @@ namespace edm { | |
|
||
std::shared_ptr<EventSetupProvider> makeProvider(ParameterSet&, | ||
ActivityRegistry*, | ||
ParameterSet const* eventSetupPset = nullptr); | ||
ParameterSet const* eventSetupPset = nullptr, | ||
unsigned int nConcurrentLumis = 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above |
||
bool dumpOptions = false); | ||
|
||
void setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,11 +16,16 @@ namespace edm { | |
|
||
NumberOfConcurrentIOVs::NumberOfConcurrentIOVs() : numberConcurrentIOVs_(1) {} | ||
|
||
void NumberOfConcurrentIOVs::readConfigurationParameters(ParameterSet const* eventSetupPset) { | ||
void NumberOfConcurrentIOVs::readConfigurationParameters(ParameterSet const* eventSetupPset, | ||
unsigned int nConcurrentLumis, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above |
||
bool dumpOptions) { | ||
if (eventSetupPset) { // this condition is false for SubProcesses | ||
numberConcurrentIOVs_ = eventSetupPset->getUntrackedParameter<unsigned int>("numberOfConcurrentIOVs"); | ||
if (numberConcurrentIOVs_ == 0) { | ||
numberConcurrentIOVs_ = 1; | ||
if (numberConcurrentIOVs_ == 0 || numberConcurrentIOVs_ > nConcurrentLumis) { | ||
numberConcurrentIOVs_ = nConcurrentLumis; | ||
} | ||
if (dumpOptions) { | ||
LogAbsolute("Options") << "Number of Concurrent IOVs = " << numberConcurrentIOVs_; | ||
} | ||
|
||
ParameterSet const& pset(eventSetupPset->getUntrackedParameterSet("forceNumberOfConcurrentIOVs")); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,9 @@ namespace edm { | |
public: | ||
NumberOfConcurrentIOVs(); | ||
|
||
void readConfigurationParameters(ParameterSet const* eventSetupPset); | ||
void readConfigurationParameters(ParameterSet const* eventSetupPset, | ||
unsigned int nConcurrentLumis, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above |
||
bool dumpOptions); | ||
|
||
// Can't have more concurrent IOVs than streams or concurrent lumis | ||
void setMaxConcurrentIOVs(unsigned int nStreams, unsigned int nConcurrentLumis); | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -36,6 +36,11 @@ | |||||||||||||||||||||||
<use name="FWCore/Utilities"/> | ||||||||||||||||||||||||
</bin> | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
<bin name="TestFWCoreFrameworkOptions" file="TestDriver.cpp"> | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is scram support now for specifying running a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @smuzaffar I am not familiar with this (I've never seen it or forgot...). Is there is an example or documentation somewhere? Does it work the same way and set the same environmental variables as the old way? Here is the script I want to run:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here cmssw/FWCore/SharedMemory/test/BuildFile.xml Lines 17 to 19 in 98e05a9
cmssw/FWCore/Framework/test/BuildFile.xml Lines 393 to 395 in 98e05a9
cmssw/FWCore/Integration/test/BuildFile.xml Lines 447 to 451 in 98e05a9
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wddgit , it works nearly the same way ( see examples of it https://github.com/cms-sw/cmssw/blob/master/PhysicsTools/PythonAnalysis/test/BuildFile.xml ) but it does not set the env variables set here https://github.com/cms-sw/cmssw/blob/master/FWCore/Utilities/src/TestHelper.cc#L149-L164 . But most of these you can calculate within the script itself. So basically you just need to use
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wddgit , if the tests in your script do not depend on each other then better to have a small script e.g.
and then add multiple tests e.g.
this way scram can run these in parallel. Only thing you need to make sure that these tests do not write in to same output file There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mostly did what was suggested here. The main difference is instead of putting so much in the BuildFile I passed only a single index to identify the test and put the config file names and expected results in arrays in the bash script. It seemed to be too much to put expected results in a BuildFile. Overall, this seems much better. Thanks. |
||||||||||||||||||||||||
<flags TEST_RUNNER_ARGS=" /bin/bash FWCore/Framework/test run_testOptions.sh"/> | ||||||||||||||||||||||||
<use name="FWCore/Utilities"/> | ||||||||||||||||||||||||
</bin> | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
<library file="stubs/TestTriggerNames.cc" name="TestTriggerNames"> | ||||||||||||||||||||||||
<flags EDM_PLUGIN="1"/> | ||||||||||||||||||||||||
<use name="DataFormats/Common"/> | ||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
#!/bin/bash | ||
|
||
function die { echo Failure $1: status $2 ; exit $2 ; } | ||
|
||
pushd ${LOCAL_TMP_DIR} | ||
|
||
echo testOptions1_cfg.py | ||
cmsRun -p ${LOCAL_TEST_DIR}/testOptions1_cfg.py >& testOptions1.log || die "cmsRun testOptions1_cfg.py" $? | ||
grep "Number of Streams = 1" testOptions1.log || die "Failed number of streams test" $? | ||
grep "Number of Concurrent Lumis = 1" testOptions1.log || die "Failed number of concurrent lumis test" $? | ||
grep "Number of Concurrent IOVs = 1" testOptions1.log || die "Failed number of concurrent IOVs test" $? | ||
|
||
echo testOptions2_cfg.py | ||
cmsRun -p ${LOCAL_TEST_DIR}/testOptions2_cfg.py >& testOptions2.log || die "cmsRun testOptions2_cfg.py" $? | ||
grep "Number of Streams = 5" testOptions2.log || die "Failed number of streams test" $? | ||
grep "Number of Concurrent Lumis = 4" testOptions2.log || die "Failed number of concurrent lumis test" $? | ||
grep "Number of Concurrent IOVs = 3" testOptions2.log || die "Failed number of concurrent IOVs test" $? | ||
|
||
echo testOptions3_cfg.py | ||
cmsRun -p ${LOCAL_TEST_DIR}/testOptions3_cfg.py >& testOptions3.log || die "cmsRun testOptions3_cfg.py" $? | ||
grep "Number of Streams = 6" testOptions3.log || die "Failed number of streams test" $? | ||
grep "Number of Concurrent Lumis = 2" testOptions3.log || die "Failed number of concurrent lumis test" $? | ||
grep "Number of Concurrent IOVs = 2" testOptions3.log || die "Failed number of concurrent IOVs test" $? | ||
|
||
echo testOptions4_cfg.py | ||
cmsRun -p ${LOCAL_TEST_DIR}/testOptions4_cfg.py >& testOptions4.log || die "cmsRun testOptions4_cfg.py" $? | ||
grep "Number of Streams = 6" testOptions4.log || die "Failed number of streams test" $? | ||
grep "Number of Concurrent Lumis = 6" testOptions4.log || die "Failed number of concurrent lumis test" $? | ||
grep "Number of Concurrent IOVs = 6" testOptions4.log || die "Failed number of concurrent IOVs test" $? | ||
|
||
echo testOptions5_cfg.py | ||
cmsRun -p ${LOCAL_TEST_DIR}/testOptions5_cfg.py >& testOptions5.log || die "cmsRun testOptions5_cfg.py" $? | ||
grep "Number of Streams = 1" testOptions5.log || die "Failed number of streams test" $? | ||
grep "Number of Concurrent Lumis = 1" testOptions5.log || die "Failed number of concurrent lumis test" $? | ||
grep "Number of Concurrent IOVs = 1" testOptions5.log || die "Failed number of concurrent IOVs test" $? | ||
|
||
echo testOptions6_cfg.py | ||
cmsRun -p ${LOCAL_TEST_DIR}/testOptions6_cfg.py >& testOptions6.log || die "cmsRun testOptions6_cfg.py" $? | ||
# Cannot run the grep tests because by default the options are not dumped. | ||
# You can however run this manually with a debugger and check (which was done) | ||
# And also just run it and see it doesn't crash... | ||
|
||
rm testOptions1.log | ||
rm testOptions2.log | ||
rm testOptions3.log | ||
rm testOptions4.log | ||
rm testOptions5.log | ||
rm testOptions6.log | ||
|
||
popd | ||
|
||
exit 0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# Test of options when all parameters taken from Config.py | ||
import FWCore.ParameterSet.Config as cms | ||
process = cms.Process("TEST") | ||
process.source = cms.Source("EmptySource") | ||
process.maxEvents = cms.untracked.PSet( | ||
input = cms.untracked.int32(1) | ||
) | ||
process.options = dict ( | ||
dumpOptions = True | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should be able to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. It works. It is a little more concise. Thanks. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Test of options when all parameters explicitly set | ||
import FWCore.ParameterSet.Config as cms | ||
process = cms.Process("TEST") | ||
process.source = cms.Source("EmptySource") | ||
process.maxEvents = cms.untracked.PSet( | ||
input = cms.untracked.int32(1) | ||
) | ||
|
||
process.options = dict( | ||
dumpOptions = True, | ||
numberOfThreads = 6, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should probably keep the # threads to 4 or less as many IB VMs are only 4 threaded machines. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. I reduced the number of threads to 4 when it was greater than 4 in this file and the other configurations as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wondering. For each individual test I limited this to 4 concurrent threads. But does this resolve all the problems if scram is running multiple tests concurrently? I was wondering how that works. Does each test run on a different VM? |
||
numberOfStreams = 5, | ||
numberOfConcurrentLuminosityBlocks = 4, | ||
eventSetup = dict( | ||
numberOfConcurrentIOVs = 3 | ||
) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Test of options when parameters are zero | ||
import FWCore.ParameterSet.Config as cms | ||
process = cms.Process("TEST") | ||
process.source = cms.Source("EmptySource") | ||
process.maxEvents = cms.untracked.PSet( | ||
input = cms.untracked.int32(1) | ||
) | ||
|
||
process.options = dict( | ||
dumpOptions = True, | ||
numberOfThreads = 6, | ||
numberOfStreams = 0, | ||
numberOfConcurrentLuminosityBlocks = 0, | ||
eventSetup = dict( | ||
numberOfConcurrentIOVs = 0 | ||
) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Test of options when concurrentIOVs and concurrentLumis are too big | ||
import FWCore.ParameterSet.Config as cms | ||
process = cms.Process("TEST") | ||
process.source = cms.Source("EmptySource") | ||
process.maxEvents = cms.untracked.PSet( | ||
input = cms.untracked.int32(1) | ||
) | ||
|
||
process.options = dict( | ||
dumpOptions = True, | ||
numberOfThreads = 6, | ||
numberOfStreams = 6, | ||
numberOfConcurrentLuminosityBlocks = 7, | ||
eventSetup = dict( | ||
numberOfConcurrentIOVs = 7 | ||
) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# Test of options when a looper is in the job | ||
import FWCore.ParameterSet.Config as cms | ||
process = cms.Process("TEST") | ||
process.source = cms.Source("EmptySource") | ||
process.maxEvents = cms.untracked.PSet( | ||
input = cms.untracked.int32(1) | ||
) | ||
|
||
process.options = dict( | ||
dumpOptions = True, | ||
numberOfThreads = 6, | ||
numberOfStreams = 6, | ||
numberOfConcurrentLuminosityBlocks = 7, | ||
eventSetup = dict( | ||
numberOfConcurrentIOVs = 7 | ||
) | ||
) | ||
|
||
process.looper = cms.Looper("DummyLooper", | ||
value = cms.untracked.int32(4) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# Test of options when parameter defaults coming from descriptions | ||
import FWCore.ParameterSet.Config as cms | ||
process = cms.Process("TEST") | ||
process.source = cms.Source("EmptySource") | ||
process.maxEvents = cms.untracked.PSet( | ||
input = cms.untracked.int32(1) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add a
LogWarning
if this changes the settings. I know we didn't have that before, but that was probably bad.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, I added the LogWarning. Thanks.