Skip to content

Commit

Permalink
Moved event meta-data storage in streamer files
Browse files Browse the repository at this point in the history
The event meta-data (e.g. BranchLists) is now added as an
EventMsg before the following EventMsg for which it applies.
This added EventMsg contains no data products. When seen,
the added EventMsg is handled as an artificial file boundary in
order to cause the needed stream synchronization.
This change is needed to handle how HLT merges parts of streamer
files which might have different meta-data.
  • Loading branch information
Dr15Jones committed May 2, 2024
1 parent bcbcf69 commit e3d9efe
Show file tree
Hide file tree
Showing 22 changed files with 367 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
#include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h"
#include "IOPool/Streamer/interface/StreamerInputModule.h"

typedef edm::StreamerInputModule<WatcherStreamFileReader> WatcherSource;
typedef edm::streamer::StreamerInputModule<WatcherStreamFileReader> WatcherSource;

#endif //WatcherSourceModule_H not defined
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef IOPool_Streamer_StreamerFileReader_h
#define IOPool_Streamer_StreamerFileReader_h
#ifndef CalibCalorimetry_EcalLaserSorting_WatcherStreamFileReader_h
#define CalibCalorimetry_EcalLaserSorting_WatcherStreamFileReader_h

#include "IOPool/Streamer/interface/InitMessage.h"
#include "IOPool/Streamer/interface/EventMessage.h"
Expand All @@ -19,7 +19,7 @@
* This protection is obviously not full proof, especially to transfer lag.
*/

namespace edm {
namespace edm::streamer {
class StreamerInputFile;
}

Expand All @@ -28,15 +28,16 @@ class WatcherStreamFileReader {
WatcherStreamFileReader(edm::ParameterSet const& pset);
~WatcherStreamFileReader();

const InitMsgView* getHeader();
const EventMsgView* getNextEvent();
const edm::streamer::InitMsgView* getHeader();
const edm::streamer::EventMsgView* getNextEvent();
const bool newHeader();

edm::StreamerInputFile* getInputFile();
edm::streamer::StreamerInputFile* getInputFile();

void closeFile();

private:
void moveJustReadFile();
/** Directory to look for streamer files
*/
std::string inputDir_;
Expand All @@ -59,7 +60,7 @@ class WatcherStreamFileReader {

/** Cached input file stream
*/
std::unique_ptr<edm::StreamerInputFile> streamerInputFile_;
std::unique_ptr<edm::streamer::StreamerInputFile> streamerInputFile_;

std::string fileName_;

Expand Down
26 changes: 14 additions & 12 deletions CalibCalorimetry/EcalLaserSorting/src/WatcherStreamFileReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

//using namespace edm;
using namespace std;
using namespace edm::streamer;

//std::string WatcherStreamFileReader::fileName_;

Expand Down Expand Up @@ -154,12 +155,12 @@ WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset)
WatcherStreamFileReader::~WatcherStreamFileReader() {}

const bool WatcherStreamFileReader::newHeader() {
edm::StreamerInputFile* inputFile = getInputFile();
return inputFile ? inputFile->newHeader() : false;
StreamerInputFile* inputFile = getInputFile();
return inputFile;
}

const InitMsgView* WatcherStreamFileReader::getHeader() {
edm::StreamerInputFile* inputFile = getInputFile();
StreamerInputFile* inputFile = getInputFile();

//TODO: shall better send an exception...
if (inputFile == nullptr) {
Expand All @@ -177,21 +178,20 @@ const InitMsgView* WatcherStreamFileReader::getHeader() {

const EventMsgView* WatcherStreamFileReader::getNextEvent() {
if (end_) {
closeFile();
moveJustReadFile();
return nullptr;
}

edm::StreamerInputFile* inputFile;

//go to next input file, till no new event is found
while ((inputFile = getInputFile()) != nullptr && inputFile->next() != edm::StreamerInputFile::Next::kEvent) {
closeFile();
StreamerInputFile* inputFile;
if ((inputFile = getInputFile()) != nullptr and inputFile->next() == StreamerInputFile::Next::kStop) {
moveJustReadFile();
return nullptr;
}

return inputFile == nullptr ? nullptr : inputFile->currentRecord();
}

edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() {
StreamerInputFile* WatcherStreamFileReader::getInputFile() {
char* lineptr = nullptr;
size_t n = 0;
static stringstream cmd;
Expand Down Expand Up @@ -379,7 +379,7 @@ edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() {
cout << "[WatcherSource " << now() << "]"
<< " Opening file " << fileName_ << "\n"
<< flush;
streamerInputFile_ = std::make_unique<edm::StreamerInputFile>(fileName_);
streamerInputFile_ = std::make_unique<StreamerInputFile>(fileName_);

ofstream f(".watcherfile");
f << fileName_;
Expand All @@ -392,7 +392,9 @@ edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() {
return streamerInputFile_.get();
}

void WatcherStreamFileReader::closeFile() {
void WatcherStreamFileReader::closeFile() {}

void WatcherStreamFileReader::moveJustReadFile() {
if (streamerInputFile_.get() == nullptr)
return;
//delete the streamer input file:
Expand Down
15 changes: 10 additions & 5 deletions CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,35 @@ echo "LOCAL_TEST_DIR = $SCRAM_TEST_PATH"
RC=0

mkdir inDir
echo "test padding"
cmsRun ${SCRAM_TEST_PATH}/streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $?
cp teststreamfile.dat teststreamfile.padding
mv teststreamfile.dat inDir/
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py with padding" $?
rm -rf inDir

mkdir inDir

echo "test original"
cmsRun ${SCRAM_TEST_PATH}/streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $?
cp teststreamfile.dat teststreamfile.original
mv teststreamfile.dat inDir
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > in 2>&1 || die "cmsRun streamIn_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > in 2>&1 || die "cmsRun streamIn_cfg.py original" $?

echo "test original and alt"
rm watcherSourceToken
cp teststreamfile.original inDir/teststreamfile.dat
cmsRun ${SCRAM_TEST_PATH}/streamOutAlt_cfg.py > outAlt 2>&1 || die "cmsRun streamOutAlt_cfg.py" $?
mv teststreamfile_alt.dat inDir
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py >alt 2>&1 || die "cmsRun streamIn_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py --alt >alt 2>&1 || die "cmsRun streamIn_cfg.py alt" $?
#timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamInAlt_cfg.py > alt 2>&1 || die "cmsRun streamInAlt_cfg.py" $?

echo "test ext"
rm watcherSourceToken
cp teststreamfile.original inDir/teststreamfile.dat
cmsRun ${SCRAM_TEST_PATH}/streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $?
cmsRun ${SCRAM_TEST_PATH}/streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $?
mv teststreamfile_ext.dat inDir
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > ext 2>&1 || die "cmsRun streamIn_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py --ext > ext 2>&1 || die "cmsRun streamIn_cfg.py ext" $?
#timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamInExt_cfg.py > ext 2>&1 || die "cmsRun streamInExt_cfg.py" $?

# echo "CHECKSUM = 1" > out
Expand Down
36 changes: 35 additions & 1 deletion CalibCalorimetry/EcalLaserSorting/test/streamIn_cfg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
import FWCore.ParameterSet.Config as cms

import argparse
import sys

parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test streamer input')

parser.add_argument("--alt", help="Have filter succeed", action="store_true")
parser.add_argument("--ext", help="Switch the order of dependencies", action="store_true")

args = parser.parse_args()


process = cms.Process("TRANSFER")

import FWCore.Framework.test.cmsExceptionsFatal_cff
Expand All @@ -26,8 +37,31 @@
product_to_get = cms.string('m1')
)

ids = [cms.EventID(1,0,0), cms.EventID(1,1,0)]
for e in range(10123456789, 10123456839):
ids.append(cms.EventID(1,1,e))
if args.alt:
for e in range(15123456789, 15123456839):
ids.append(cms.EventID(1,1,e))

if args.ext:
ids.append(cms.EventID(1,1,0))
ids.append(cms.EventID(1,0,0))
ids.append(cms.EventID(1,0,0))
ids.append(cms.EventID(1,1,0))
for e in range(20123456789, 20123456839):
ids.append(cms.EventID(1,1,e))

ids.append(cms.EventID(1,1,0))
ids.append(cms.EventID(1,0,0))

process.check = cms.EDAnalyzer("RunLumiEventChecker",
eventSequence = cms.untracked.VEventID(ids)
)


process.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('myout.root')
)

process.end = cms.EndPath(process.a1*process.out)
process.end = cms.EndPath(process.a1*process.out*process.check)
36 changes: 33 additions & 3 deletions DQMServices/StreamerIO/plugins/DQMStreamerReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ namespace dqmservices {
fiterator_.logFileAction("DQMStreamerReader initialised.");
}

void DQMStreamerReader::setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent) {
deserializeAndMergeWithRegistry(msg, subsequent);
auto event = getEventMsg();
assert(event and isEventMetaData(*event));
deserializeEventMetaData(*event);
updateEventMetaData();
}
void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
processedEventPerLs_ = 0;

Expand All @@ -92,7 +99,7 @@ namespace dqmservices {

InitMsgView const* header = getHeaderMsg();
if (isFirstFile_) {
deserializeAndMergeWithRegistry(*header, false);
setupMetaData(*header, false);
}

// dump the list of HLT trigger name from the header
Expand Down Expand Up @@ -136,9 +143,14 @@ namespace dqmservices {
return;
}

if (artificialFileBoundary_) {
updateEventMetaData();
artificialFileBoundary_ = false;
return;
}
//Get header/init from reader
InitMsgView const* header = getHeaderMsg();
deserializeAndMergeWithRegistry(*header, true);
setupMetaData(*header, true);
}

bool DQMStreamerReader::openNextFileImp_() {
Expand Down Expand Up @@ -286,6 +298,24 @@ namespace dqmservices {
// this means end of file, so close the file
closeFileImp_("eof");
} else {
//NOTE: at this point need to see if meta data checksum changed. If it did
// we need to issue a 'new File' transition
if (isEventMetaData(*eview)) {
auto lastEventMetaData = presentEventMetaDataChecksum();
if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
deserializeEventMetaData(*eview);
artificialFileBoundary_ = true;
return nullptr;
} else {
//skipping
eview = getEventMsg();
if (eview == nullptr) {
closeFileImp_("eof");
continue;
}
}
}

if (!acceptEvent(eview)) {
continue;
} else {
Expand All @@ -304,7 +334,7 @@ namespace dqmservices {
try {
EventMsgView const* eview = prepareNextEvent();
if (eview == nullptr) {
if (file_.streamFile_ and file_.streamFile_->newHeader()) {
if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
return Next::kFile;
}
return Next::kStop;
Expand Down
9 changes: 4 additions & 5 deletions DQMServices/StreamerIO/plugins/DQMStreamerReader.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
#ifndef DQMServices_StreamerIO_DQMStreamerReader_h
#define DQMServices_StreamerIO_DQMStreamerReader_h

#include "FWCore/ServiceRegistry/interface/Service.h"
#include "IOPool/Streamer/interface/StreamerInputSource.h"
#include "IOPool/Streamer/interface/StreamerInputFile.h"
#include "IOPool/Streamer/interface/MsgTools.h"

#include "DQMFileIterator.h"
#include "DQMMonitoringService.h"
#include "TriggerSelector.h"

#include <memory>
Expand Down Expand Up @@ -44,6 +42,7 @@ namespace dqmservices {
edm::streamer::InitMsgView const* getHeaderMsg();
edm::streamer::EventMsgView const* getEventMsg();

void setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent);
edm::streamer::EventMsgView const* prepareNextEvent();

bool isFirstFile_ = true;
Expand All @@ -65,6 +64,9 @@ namespace dqmservices {
bool matchTriggerSel_ = false;
bool setMatchTriggerSel(std::vector<std::string> const& tnames);

//If the event meta data changes while reading a file, we need to
// cause a file transition to happen to allow synchronous update
bool artificialFileBoundary_ = false;
struct OpenFile {
std::unique_ptr<edm::streamer::StreamerInputFile> streamFile_;
DQMFileIterator::LumiEntry lumi_;
Expand All @@ -75,9 +77,6 @@ namespace dqmservices {

std::shared_ptr<edm::EventSkipperByID> eventSkipperByID_;
std::shared_ptr<TriggerSelector> triggerSelector_;

/* this is for monitoring */
edm::Service<DQMMonitoringService> mon_;
};

} // namespace dqmservices
Expand Down
27 changes: 17 additions & 10 deletions DataFormats/Streamer/interface/StreamedProducts.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,40 @@ namespace edm {
SendEvent(EventAuxiliary const& aux,
ProcessHistory const& processHistory,
EventSelectionIDVector const& eventSelectionIDs,
BranchListIndexes const& branchListIndexes)
BranchListIndexes const& branchListIndexes,
BranchIDLists const& branchIDLists,
ThinnedAssociationsHelper const& thinnedAssociationsHelper,
uint32_t metaDataChecksum)
: aux_(aux),
processHistory_(processHistory),
eventSelectionIDs_(eventSelectionIDs),
branchListIndexes_(branchListIndexes),
products_() {}
branchIDLists_(branchIDLists),
thinnedAssociationsHelper_(thinnedAssociationsHelper),
products_(),
metaDataChecksum_(metaDataChecksum) {}
EventAuxiliary const& aux() const { return aux_; }
SendProds const& products() const { return products_; }
ProcessHistory const& processHistory() const { return processHistory_; }
EventSelectionIDVector const& eventSelectionIDs() const { return eventSelectionIDs_; }
BranchListIndexes const& branchListIndexes() const { return branchListIndexes_; }
//This will only hold values for EventMetaData messages
BranchIDLists const& branchIDLists() const { return branchIDLists_; }
//This will only hold values for EventMetaData messages
ThinnedAssociationsHelper const& thinnedAssociationsHelper() const { return thinnedAssociationsHelper_; }
//This is the adler32 checksum of the EventMetaData associated with this Event
uint32_t metaDataChecksum() const { return metaDataChecksum_; }
SendProds& products() { return products_; }

private:
EventAuxiliary aux_;
ProcessHistory processHistory_;
EventSelectionIDVector eventSelectionIDs_;
BranchListIndexes branchListIndexes_;
BranchIDLists branchIDLists_;
ThinnedAssociationsHelper thinnedAssociationsHelper_;
SendProds products_;
uint32_t metaDataChecksum_;

// other tables necessary for provenance lookup
};
Expand All @@ -105,21 +120,13 @@ namespace edm {
SendJobHeader() {}
SendDescs const& descs() const { return descs_; }
ParameterSetMap const& processParameterSet() const { return processParameterSet_; }
BranchIDLists const& branchIDLists() const { return branchIDLists_; }
ThinnedAssociationsHelper const& thinnedAssociationsHelper() const { return thinnedAssociationsHelper_; }
void push_back(BranchDescription const& bd) { descs_.push_back(bd); }
void setParameterSetMap(ParameterSetMap const& psetMap) { processParameterSet_ = psetMap; }
void setBranchIDLists(BranchIDLists const& bidlists) { branchIDLists_ = bidlists; }
void setThinnedAssociationsHelper(ThinnedAssociationsHelper const& v) { thinnedAssociationsHelper_ = v; }
void initializeTransients();

private:
SendDescs descs_;
ParameterSetMap processParameterSet_;
BranchIDLists branchIDLists_;
ThinnedAssociationsHelper thinnedAssociationsHelper_;
// trigger bit descriptions will be added here and permanent
// provenance values
};

} // namespace edm
Expand Down
Loading

0 comments on commit e3d9efe

Please sign in to comment.