Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve IndexIntoFile for concurrent lumis/runs #37532

Merged
merged 12 commits into from
May 5, 2022
215 changes: 192 additions & 23 deletions DataFormats/Provenance/interface/IndexIntoFile.h

Large diffs are not rendered by default.

717 changes: 577 additions & 140 deletions DataFormats/Provenance/src/IndexIntoFile.cc

Large diffs are not rendered by default.

408 changes: 404 additions & 4 deletions DataFormats/Provenance/test/indexIntoFile2_t.cppunit.cc

Large diffs are not rendered by default.

434 changes: 406 additions & 28 deletions DataFormats/Provenance/test/indexIntoFile3_t.cppunit.cc

Large diffs are not rendered by default.

519 changes: 517 additions & 2 deletions DataFormats/Provenance/test/indexIntoFile_t.cppunit.cc

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions FWCore/Framework/interface/LuminosityBlockPrincipal.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace edm {
public:
typedef LuminosityBlockAuxiliary Auxiliary;
typedef Principal Base;

LuminosityBlockPrincipal(std::shared_ptr<ProductRegistry const> reg,
ProcessConfiguration const& pc,
HistoryAppender* historyAppender,
Expand All @@ -48,8 +49,6 @@ namespace edm {

void setRunPrincipal(std::shared_ptr<RunPrincipal> rp) { runPrincipal_ = rp; }

void setWillBeContinued(bool iContinued) { willBeContinued_ = iContinued; }

LuminosityBlockIndex index() const { return index_; }

LuminosityBlockID id() const { return aux().id(); }
Expand All @@ -73,8 +72,9 @@ namespace edm {

void put(ProductResolverIndex index, std::unique_ptr<WrapperBase> edp) const;

///The source is replaying overlapping LuminosityBlocks and this is not the last part for this LumiosityBlock
bool willBeContinued() const { return willBeContinued_; }
enum ShouldWriteLumi { kUninitialized, kNo, kYes };
ShouldWriteLumi shouldWriteLumi() const { return shouldWriteLumi_; }
void setShouldWriteLumi(ShouldWriteLumi value) { shouldWriteLumi_ = value; }

private:
unsigned int transitionIndex_() const override;
Expand All @@ -85,7 +85,7 @@ namespace edm {

LuminosityBlockIndex index_;

bool willBeContinued_ = false;
ShouldWriteLumi shouldWriteLumi_ = kUninitialized;
};
} // namespace edm
#endif
6 changes: 6 additions & 0 deletions FWCore/Framework/interface/RunPrincipal.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ namespace edm {

void preReadFile();

enum ShouldWriteRun { kUninitialized, kNo, kYes };
ShouldWriteRun shouldWriteRun() const { return shouldWriteRun_; }
void setShouldWriteRun(ShouldWriteRun value) { shouldWriteRun_ = value; }

private:
unsigned int transitionIndex_() const override;

Expand All @@ -93,6 +97,8 @@ namespace edm {
// there should be one MergeableRunProductMetadata object created
// per concurrent run. In all other cases, this should just be null.
edm::propagate_const<std::unique_ptr<MergeableRunProductMetadata>> mergeableRunProductMetadataPtr_;

ShouldWriteRun shouldWriteRun_ = kUninitialized;
};
} // namespace edm
#endif
25 changes: 14 additions & 11 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1232,17 +1232,19 @@ namespace edm {
endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);

if (globalBeginSucceeded) {
FinalWaitingTask t;
RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
mergeableRunProductMetadata->preWriteRun();
writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
do {
taskGroup_.wait();
} while (not t.done());
mergeableRunProductMetadata->postWriteRun();
if (t.exceptionPtr()) {
std::rethrow_exception(*t.exceptionPtr());
if (runPrincipal.shouldWriteRun() != RunPrincipal::kNo) {
FinalWaitingTask t;
MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
mergeableRunProductMetadata->preWriteRun();
writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
do {
taskGroup_.wait();
} while (not t.done());
mergeableRunProductMetadata->postWriteRun();
if (t.exceptionPtr()) {
std::rethrow_exception(*t.exceptionPtr());
}
}
}
}
Expand Down Expand Up @@ -1782,7 +1784,7 @@ namespace edm {

void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
using namespace edm::waiting_task;
if (not lumiPrincipal.willBeContinued()) {
if (lumiPrincipal.shouldWriteLumi() != LuminosityBlockPrincipal::kNo) {
chain::first([&](auto nextTask) {
ServiceRegistry::Operate op(serviceToken_);

Expand All @@ -1801,6 +1803,7 @@ namespace edm {
for (auto& s : subProcesses_) {
s.deleteLumiFromCache(*iStatus.lumiPrincipal());
}
iStatus.lumiPrincipal()->setShouldWriteLumi(LuminosityBlockPrincipal::kUninitialized);
iStatus.lumiPrincipal()->clearPrincipal();
//FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
}
Expand Down
1 change: 0 additions & 1 deletion FWCore/Framework/src/LuminosityBlockPrincipal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace edm {
void LuminosityBlockPrincipal::fillLuminosityBlockPrincipal(ProcessHistory const* processHistory,
DelayedReader* reader) {
fillPrincipal(aux_.processHistoryID(), processHistory, reader);
willBeContinued_ = false;
}

void LuminosityBlockPrincipal::put(BranchDescription const& bd, std::unique_ptr<WrapperBase> edp) const {
Expand Down
9 changes: 9 additions & 0 deletions FWCore/Integration/test/run_RunMerge.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ pushd ${LOCAL_TMP_DIR}
echo ${test}COPY1------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}COPY1_cfg.py || die "cmsRun ${test}COPY1_cfg.py" $?

echo ${test}MERGE6------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}MERGE6_cfg.py || die "cmsRun ${test}MERGE6_cfg.py" $?

echo ${test}NoRunLumiSort------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}NoRunLumiSort_cfg.py || die "cmsRun ${test}NoRunLumiSort_cfg.py" $?

echo ${test}TEST6------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}TEST6_cfg.py || die "cmsRun ${test}TEST6_cfg.py" $?

echo ${test}PickEvents------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}PickEvents_cfg.py || die "cmsRun ${test}PickEvents_cfg.py" $?

Expand Down
106 changes: 106 additions & 0 deletions FWCore/Integration/test/testRunMergeMERGE6_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# The purpose of this configuration is to prepare
# input for a test of the noRunLumiSort configuration
# parameter that has non-contiguous sequences of
# events from the same run (and the same lumi).

# It is expected there are 6 warnings that
# print out while this runs related to merging.
# The test should pass with these warnings.

import FWCore.ParameterSet.Config as cms

process = cms.Process("MERGE")

process.source = cms.Source("PoolSource",
fileNames = cms.untracked.vstring(
'file:testRunMerge1.root',
'file:testRunMerge2.root',
'file:testRunMerge3.root'
),
inputCommands = cms.untracked.vstring(
'keep *',
'drop *_A_*_*',
'drop *_B_*_*',
'drop *_C_*_*',
'drop *_D_*_*',
'drop *_E_*_*',
'drop *_F_*_*',
'drop *_G_*_*',
'drop *_H_*_*',
'drop *_I_*_*',
'drop *_J_*_*',
'drop *_K_*_*',
'drop *_L_*_*',
'drop *_tryNoPut_*_*',
'drop *_aliasForThingToBeDropped2_*_*',
'drop *_dependsOnThingToBeDropped1_*_*',
'drop *_makeThingToBeDropped_*_*',
'drop edmtestThingWithMerge_makeThingToBeDropped1_*_*',
'drop edmtestThing_*_*_*'
)
)

process.thingWithMergeProducer = cms.EDProducer("ThingWithMergeProducer")

process.test = cms.EDAnalyzer("TestMergeResults",

# Check to see that the value we read matches what we know
# was written. Expected values listed below come in sets of three
# value expected in Thing
# value expected in ThingWithMerge
# value expected in ThingWithIsEqual
# Each set of 3 is tested at endRun for the expected
# run values or at endLuminosityBlock for the expected
# lumi values. And then the next set of three values
# is tested at the next endRun or endLuminosityBlock.
# When the sequence of parameter values is exhausted it stops checking
# 0's are just placeholders, if the value is a "0" the check is not made.

expectedBeginRunProd = cms.untracked.vint32(
0, 20004, 10003, # File boundary before this causing merge
0, 10002, 10003,
0, 10002, 10004
),
expectedEndRunProd = cms.untracked.vint32(
0, 200004, 100003, # File boundary before this causing merge
0, 100002, 100003,
0, 100002, 100004
),
expectedBeginLumiProd = cms.untracked.vint32(
0, 204, 103, # File boundary before this causing merge
0, 102, 103,
0, 102, 104
),
expectedEndLumiProd = cms.untracked.vint32(
0, 2004, 1003, # File boundary before this causing merge
0, 1002, 1003,
0, 1002, 1004
),
expectedBeginRunNew = cms.untracked.vint32(
0, 10002, 10003,
0, 10002, 10003,
0, 10002, 10003
),
expectedEndRunNew = cms.untracked.vint32(
0, 100002, 100003,
0, 100002, 100003,
0, 100002, 100003
),
expectedBeginLumiNew = cms.untracked.vint32(
0, 102, 103,
0, 102, 103,
0, 102, 103
),
expectedEndLumiNew = cms.untracked.vint32(
0, 1002, 1003,
0, 1002, 1003,
0, 1002, 1003
)
)

process.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('testRunMergeMERGE6.root')
)

process.path1 = cms.Path(process.thingWithMergeProducer + process.test)
process.e = cms.EndPath(process.out)
116 changes: 116 additions & 0 deletions FWCore/Integration/test/testRunMergeNoRunLumiSort_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# A test of the noRunLumiSort configuration parameter
# where the input has non-contiguous event sequences
# from the same run.

# It is expected there are 8 warnings and 8 errors that
# print out while this runs related to merging.
# The test should pass with these errors and warnings.

import FWCore.ParameterSet.Config as cms

process = cms.Process("NORUNLUMISORT")

process.source = cms.Source("PoolSource",
fileNames = cms.untracked.vstring(
'file:testRunMergeMERGE6.root',
'file:testRunMergeMERGE6.root'
),
duplicateCheckMode = cms.untracked.string('checkEachRealDataFile'),
noRunLumiSort = cms.untracked.bool(True)
)

process.test2 = cms.EDAnalyzer('RunLumiEventAnalyzer',
verbose = cms.untracked.bool(True),
expectedRunLumiEvents = cms.untracked.vuint32(
1, 0, 0,
1, 1, 0,
1, 1, 11,
1, 1, 12,
1, 1, 13,
1, 1, 14,
1, 1, 15,
1, 1, 16,
1, 1, 17,
1, 1, 18,
1, 1, 19,
1, 1, 20,
1, 1, 21,
1, 1, 22,
1, 1, 23,
1, 1, 24,
1, 1, 25,
1, 1, 0,
1, 0, 0,
2, 0, 0,
2, 1, 0,
2, 1, 1,
2, 1, 2,
2, 1, 3,
2, 1, 4,
2, 1, 5,
2, 1, 0,
2, 0, 0,
1, 0, 0,
1, 1, 0,
1, 1, 1,
1, 1, 2,
1, 1, 3,
1, 1, 4,
1, 1, 5,
1, 1, 6,
1, 1, 7,
1, 1, 8,
1, 1, 9,
1, 1, 10
)
)
process.test2.expectedRunLumiEvents.extend([
1, 1, 11,
1, 1, 12,
1, 1, 13,
1, 1, 14,
1, 1, 15,
1, 1, 16,
1, 1, 17,
1, 1, 18,
1, 1, 19,
1, 1, 20,
1, 1, 21,
1, 1, 22,
1, 1, 23,
1, 1, 24,
1, 1, 25,
1, 1, 0,
1, 0, 0,
2, 0, 0,
2, 1, 0,
2, 1, 1,
2, 1, 2,
2, 1, 3,
2, 1, 4,
2, 1, 5,
2, 1, 0,
2, 0, 0,
1, 0, 0,
1, 1, 0,
1, 1, 1,
1, 1, 2,
1, 1, 3,
1, 1, 4,
1, 1, 5,
1, 1, 6,
1, 1, 7,
1, 1, 8,
1, 1, 9,
1, 1, 10,
1, 1, 0,
1, 0, 0,
])

process.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('testRunMergeNoRunLumiSort.root')
)

process.path1 = cms.Path(process.test2)

process.e = cms.EndPath(process.out)
Loading