Skip to content

Commit

Permalink
Improve IndexIntoFile for concurrent lumis/runs
Browse files Browse the repository at this point in the history
  • Loading branch information
wddgit committed Apr 11, 2022
1 parent 48f1b48 commit ec0a81d
Show file tree
Hide file tree
Showing 20 changed files with 2,521 additions and 232 deletions.
179 changes: 160 additions & 19 deletions DataFormats/Provenance/interface/IndexIntoFile.h

Large diffs are not rendered by default.

695 changes: 563 additions & 132 deletions DataFormats/Provenance/src/IndexIntoFile.cc

Large diffs are not rendered by default.

408 changes: 404 additions & 4 deletions DataFormats/Provenance/test/indexIntoFile2_t.cppunit.cc

Large diffs are not rendered by default.

434 changes: 406 additions & 28 deletions DataFormats/Provenance/test/indexIntoFile3_t.cppunit.cc

Large diffs are not rendered by default.

519 changes: 517 additions & 2 deletions DataFormats/Provenance/test/indexIntoFile_t.cppunit.cc

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions FWCore/Framework/interface/LuminosityBlockPrincipal.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace edm {
public:
typedef LuminosityBlockAuxiliary Auxiliary;
typedef Principal Base;

LuminosityBlockPrincipal(std::shared_ptr<ProductRegistry const> reg,
ProcessConfiguration const& pc,
HistoryAppender* historyAppender,
Expand All @@ -48,8 +49,6 @@ namespace edm {

void setRunPrincipal(std::shared_ptr<RunPrincipal> rp) { runPrincipal_ = rp; }

void setWillBeContinued(bool iContinued) { willBeContinued_ = iContinued; }

LuminosityBlockIndex index() const { return index_; }

LuminosityBlockID id() const { return aux().id(); }
Expand All @@ -73,8 +72,9 @@ namespace edm {

void put(ProductResolverIndex index, std::unique_ptr<WrapperBase> edp) const;

///The source is replaying overlapping LuminosityBlocks and this is not the last part for this LumiosityBlock
bool willBeContinued() const { return willBeContinued_; }
enum ContinueState { kUninitialized, kWillBeContinued, kProcessLumi };
ContinueState continueState() const { return continueState_; }
void setContinueState(ContinueState value) { continueState_ = value; }

private:
unsigned int transitionIndex_() const override;
Expand All @@ -85,7 +85,7 @@ namespace edm {

LuminosityBlockIndex index_;

bool willBeContinued_ = false;
ContinueState continueState_ = kUninitialized;
};
} // namespace edm
#endif
6 changes: 6 additions & 0 deletions FWCore/Framework/interface/RunPrincipal.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ namespace edm {

void preReadFile();

enum ContinueState { kUninitialized, kWillBeContinued, kProcessRun };
ContinueState continueState() const { return continueState_; }
void setContinueState(ContinueState value) { continueState_ = value; }

private:
unsigned int transitionIndex_() const override;

Expand All @@ -93,6 +97,8 @@ namespace edm {
// there should be one MergeableRunProductMetadata object created
// per concurrent run. In all other cases, this should just be null.
edm::propagate_const<std::unique_ptr<MergeableRunProductMetadata>> mergeableRunProductMetadataPtr_;

ContinueState continueState_ = kUninitialized;
};
} // namespace edm
#endif
25 changes: 14 additions & 11 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1232,17 +1232,19 @@ namespace edm {
endRun(phid, run, globalBeginSucceeded, cleaningUpAfterException);

if (globalBeginSucceeded) {
FinalWaitingTask t;
RunPrincipal& runPrincipal = principalCache_.runPrincipal(phid, run);
MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
mergeableRunProductMetadata->preWriteRun();
writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
do {
taskGroup_.wait();
} while (not t.done());
mergeableRunProductMetadata->postWriteRun();
if (t.exceptionPtr()) {
std::rethrow_exception(*t.exceptionPtr());
if (runPrincipal.continueState() != RunPrincipal::kWillBeContinued) {
FinalWaitingTask t;
MergeableRunProductMetadata* mergeableRunProductMetadata = runPrincipal.mergeableRunProductMetadata();
mergeableRunProductMetadata->preWriteRun();
writeRunAsync(edm::WaitingTaskHolder{taskGroup_, &t}, phid, run, mergeableRunProductMetadata);
do {
taskGroup_.wait();
} while (not t.done());
mergeableRunProductMetadata->postWriteRun();
if (t.exceptionPtr()) {
std::rethrow_exception(*t.exceptionPtr());
}
}
}
}
Expand Down Expand Up @@ -1782,7 +1784,7 @@ namespace edm {

void EventProcessor::writeLumiAsync(WaitingTaskHolder task, LuminosityBlockPrincipal& lumiPrincipal) {
using namespace edm::waiting_task;
if (not lumiPrincipal.willBeContinued()) {
if (lumiPrincipal.continueState() != LuminosityBlockPrincipal::kWillBeContinued) {
chain::first([&](auto nextTask) {
ServiceRegistry::Operate op(serviceToken_);

Expand All @@ -1801,6 +1803,7 @@ namespace edm {
for (auto& s : subProcesses_) {
s.deleteLumiFromCache(*iStatus.lumiPrincipal());
}
iStatus.lumiPrincipal()->setContinueState(LuminosityBlockPrincipal::kUninitialized);
iStatus.lumiPrincipal()->clearPrincipal();
//FDEBUG(1) << "\tdeleteLumiFromCache " << run << "/" << lumi << "\n";
}
Expand Down
1 change: 0 additions & 1 deletion FWCore/Framework/src/LuminosityBlockPrincipal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace edm {
void LuminosityBlockPrincipal::fillLuminosityBlockPrincipal(ProcessHistory const* processHistory,
DelayedReader* reader) {
fillPrincipal(aux_.processHistoryID(), processHistory, reader);
willBeContinued_ = false;
}

void LuminosityBlockPrincipal::put(BranchDescription const& bd, std::unique_ptr<WrapperBase> edp) const {
Expand Down
9 changes: 9 additions & 0 deletions FWCore/Integration/test/run_RunMerge.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ pushd ${LOCAL_TMP_DIR}
echo ${test}COPY1------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}COPY1_cfg.py || die "cmsRun ${test}COPY1_cfg.py" $?

echo ${test}MERGE6------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}MERGE6_cfg.py || die "cmsRun ${test}MERGE6_cfg.py" $?

echo ${test}NoRunLumiSort------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}NoRunLumiSort_cfg.py || die "cmsRun ${test}NoRunLumiSort_cfg.py" $?

echo ${test}TEST6------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}TEST6_cfg.py || die "cmsRun ${test}TEST6_cfg.py" $?

echo ${test}PickEvents------------------------------------------------------------
cmsRun -p ${LOCAL_TEST_DIR}/${test}PickEvents_cfg.py || die "cmsRun ${test}PickEvents_cfg.py" $?

Expand Down
106 changes: 106 additions & 0 deletions FWCore/Integration/test/testRunMergeMERGE6_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# The purpose of this configuration is to prepare
# input for a test of the noRunLumiSort configuration
# parameter that has non-contiguous sequences of
# events from the same run (and the same lumi).

# It is expected there are 6 warnings that
# print out while this runs related to merging.
# The test should pass with these warnings.

import FWCore.ParameterSet.Config as cms

process = cms.Process("MERGE")

process.source = cms.Source("PoolSource",
fileNames = cms.untracked.vstring(
'file:testRunMerge1.root',
'file:testRunMerge2.root',
'file:testRunMerge3.root'
),
inputCommands = cms.untracked.vstring(
'keep *',
'drop *_A_*_*',
'drop *_B_*_*',
'drop *_C_*_*',
'drop *_D_*_*',
'drop *_E_*_*',
'drop *_F_*_*',
'drop *_G_*_*',
'drop *_H_*_*',
'drop *_I_*_*',
'drop *_J_*_*',
'drop *_K_*_*',
'drop *_L_*_*',
'drop *_tryNoPut_*_*',
'drop *_aliasForThingToBeDropped2_*_*',
'drop *_dependsOnThingToBeDropped1_*_*',
'drop *_makeThingToBeDropped_*_*',
'drop edmtestThingWithMerge_makeThingToBeDropped1_*_*',
'drop edmtestThing_*_*_*'
)
)

process.thingWithMergeProducer = cms.EDProducer("ThingWithMergeProducer")

process.test = cms.EDAnalyzer("TestMergeResults",

# Check to see that the value we read matches what we know
# was written. Expected values listed below come in sets of three
# value expected in Thing
# value expected in ThingWithMerge
# value expected in ThingWithIsEqual
# Each set of 3 is tested at endRun for the expected
# run values or at endLuminosityBlock for the expected
# lumi values. And then the next set of three values
# is tested at the next endRun or endLuminosityBlock.
# When the sequence of parameter values is exhausted it stops checking
# 0's are just placeholders, if the value is a "0" the check is not made.

expectedBeginRunProd = cms.untracked.vint32(
0, 20004, 10003, # File boundary before this causing merge
0, 10002, 10003,
0, 10002, 10004
),
expectedEndRunProd = cms.untracked.vint32(
0, 200004, 100003, # File boundary before this causing merge
0, 100002, 100003,
0, 100002, 100004
),
expectedBeginLumiProd = cms.untracked.vint32(
0, 204, 103, # File boundary before this causing merge
0, 102, 103,
0, 102, 104
),
expectedEndLumiProd = cms.untracked.vint32(
0, 2004, 1003, # File boundary before this causing merge
0, 1002, 1003,
0, 1002, 1004
),
expectedBeginRunNew = cms.untracked.vint32(
0, 10002, 10003,
0, 10002, 10003,
0, 10002, 10003
),
expectedEndRunNew = cms.untracked.vint32(
0, 100002, 100003,
0, 100002, 100003,
0, 100002, 100003
),
expectedBeginLumiNew = cms.untracked.vint32(
0, 102, 103,
0, 102, 103,
0, 102, 103
),
expectedEndLumiNew = cms.untracked.vint32(
0, 1002, 1003,
0, 1002, 1003,
0, 1002, 1003
)
)

process.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('testRunMergeMERGE6.root')
)

process.path1 = cms.Path(process.thingWithMergeProducer + process.test)
process.e = cms.EndPath(process.out)
116 changes: 116 additions & 0 deletions FWCore/Integration/test/testRunMergeNoRunLumiSort_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# A test of the noRunLumiSort configuration parameter
# where the input has non-contiguous event sequences
# from the same run.

# It is expected there are 8 warnings and 8 errors that
# print out while this runs related to merging.
# The test should pass with these errors and warnings.

import FWCore.ParameterSet.Config as cms

process = cms.Process("NORUNLUMISORT")

process.source = cms.Source("PoolSource",
fileNames = cms.untracked.vstring(
'file:testRunMergeMERGE6.root',
'file:testRunMergeMERGE6.root'
),
duplicateCheckMode = cms.untracked.string('checkEachRealDataFile'),
noRunLumiSort = cms.untracked.bool(True)
)

process.test2 = cms.EDAnalyzer('RunLumiEventAnalyzer',
verbose = cms.untracked.bool(True),
expectedRunLumiEvents = cms.untracked.vuint32(
1, 0, 0,
1, 1, 0,
1, 1, 11,
1, 1, 12,
1, 1, 13,
1, 1, 14,
1, 1, 15,
1, 1, 16,
1, 1, 17,
1, 1, 18,
1, 1, 19,
1, 1, 20,
1, 1, 21,
1, 1, 22,
1, 1, 23,
1, 1, 24,
1, 1, 25,
1, 1, 0,
1, 0, 0,
2, 0, 0,
2, 1, 0,
2, 1, 1,
2, 1, 2,
2, 1, 3,
2, 1, 4,
2, 1, 5,
2, 1, 0,
2, 0, 0,
1, 0, 0,
1, 1, 0,
1, 1, 1,
1, 1, 2,
1, 1, 3,
1, 1, 4,
1, 1, 5,
1, 1, 6,
1, 1, 7,
1, 1, 8,
1, 1, 9,
1, 1, 10
)
)
process.test2.expectedRunLumiEvents.extend([
1, 1, 11,
1, 1, 12,
1, 1, 13,
1, 1, 14,
1, 1, 15,
1, 1, 16,
1, 1, 17,
1, 1, 18,
1, 1, 19,
1, 1, 20,
1, 1, 21,
1, 1, 22,
1, 1, 23,
1, 1, 24,
1, 1, 25,
1, 1, 0,
1, 0, 0,
2, 0, 0,
2, 1, 0,
2, 1, 1,
2, 1, 2,
2, 1, 3,
2, 1, 4,
2, 1, 5,
2, 1, 0,
2, 0, 0,
1, 0, 0,
1, 1, 0,
1, 1, 1,
1, 1, 2,
1, 1, 3,
1, 1, 4,
1, 1, 5,
1, 1, 6,
1, 1, 7,
1, 1, 8,
1, 1, 9,
1, 1, 10,
1, 1, 0,
1, 0, 0,
])

process.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('testRunMergeNoRunLumiSort.root')
)

process.path1 = cms.Path(process.test2)

process.e = cms.EndPath(process.out)
Loading

0 comments on commit ec0a81d

Please sign in to comment.