Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle event meta data changes in streamer files #44892

Merged
merged 4 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
#include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h"
#include "IOPool/Streamer/interface/StreamerInputModule.h"

typedef edm::StreamerInputModule<WatcherStreamFileReader> WatcherSource;
typedef edm::streamer::StreamerInputModule<WatcherStreamFileReader> WatcherSource;

#endif //WatcherSourceModule_H not defined
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef IOPool_Streamer_StreamerFileReader_h
#define IOPool_Streamer_StreamerFileReader_h
#ifndef CalibCalorimetry_EcalLaserSorting_WatcherStreamFileReader_h
#define CalibCalorimetry_EcalLaserSorting_WatcherStreamFileReader_h

#include "IOPool/Streamer/interface/InitMessage.h"
#include "IOPool/Streamer/interface/EventMessage.h"
Expand All @@ -19,7 +19,7 @@
* This protection is obviously not full proof, especially to transfer lag.
*/

namespace edm {
namespace edm::streamer {
class StreamerInputFile;
}

Expand All @@ -28,15 +28,16 @@ class WatcherStreamFileReader {
WatcherStreamFileReader(edm::ParameterSet const& pset);
~WatcherStreamFileReader();

const InitMsgView* getHeader();
const EventMsgView* getNextEvent();
const edm::streamer::InitMsgView* getHeader();
const edm::streamer::EventMsgView* getNextEvent();
const bool newHeader();

edm::StreamerInputFile* getInputFile();
edm::streamer::StreamerInputFile* getInputFile();

void closeFile();

private:
void moveJustReadFile();
/** Directory to look for streamer files
*/
std::string inputDir_;
Expand All @@ -59,7 +60,7 @@ class WatcherStreamFileReader {

/** Cached input file stream
*/
std::unique_ptr<edm::StreamerInputFile> streamerInputFile_;
std::unique_ptr<edm::streamer::StreamerInputFile> streamerInputFile_;

std::string fileName_;

Expand Down
27 changes: 13 additions & 14 deletions CalibCalorimetry/EcalLaserSorting/src/WatcherStreamFileReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

//using namespace edm;
using namespace std;
using namespace edm::streamer;

//std::string WatcherStreamFileReader::fileName_;

Expand Down Expand Up @@ -153,13 +154,10 @@ WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset)

WatcherStreamFileReader::~WatcherStreamFileReader() {}

const bool WatcherStreamFileReader::newHeader() {
edm::StreamerInputFile* inputFile = getInputFile();
return inputFile ? inputFile->newHeader() : false;
}
const bool WatcherStreamFileReader::newHeader() { return getInputFile() != nullptr; }

const InitMsgView* WatcherStreamFileReader::getHeader() {
edm::StreamerInputFile* inputFile = getInputFile();
StreamerInputFile* inputFile = getInputFile();

//TODO: shall better send an exception...
if (inputFile == nullptr) {
Expand All @@ -177,21 +175,20 @@ const InitMsgView* WatcherStreamFileReader::getHeader() {

const EventMsgView* WatcherStreamFileReader::getNextEvent() {
if (end_) {
closeFile();
moveJustReadFile();
return nullptr;
}

edm::StreamerInputFile* inputFile;

//go to next input file, till no new event is found
while ((inputFile = getInputFile()) != nullptr && inputFile->next() != edm::StreamerInputFile::Next::kEvent) {
closeFile();
StreamerInputFile* inputFile;
if ((inputFile = getInputFile()) != nullptr and inputFile->next() == StreamerInputFile::Next::kStop) {
moveJustReadFile();
return nullptr;
}

return inputFile == nullptr ? nullptr : inputFile->currentRecord();
}

edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() {
StreamerInputFile* WatcherStreamFileReader::getInputFile() {
char* lineptr = nullptr;
size_t n = 0;
static stringstream cmd;
Expand Down Expand Up @@ -379,7 +376,7 @@ edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() {
cout << "[WatcherSource " << now() << "]"
<< " Opening file " << fileName_ << "\n"
<< flush;
streamerInputFile_ = std::make_unique<edm::StreamerInputFile>(fileName_);
streamerInputFile_ = std::make_unique<StreamerInputFile>(fileName_);

ofstream f(".watcherfile");
f << fileName_;
Expand All @@ -392,7 +389,9 @@ edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() {
return streamerInputFile_.get();
}

void WatcherStreamFileReader::closeFile() {
void WatcherStreamFileReader::closeFile() {}

void WatcherStreamFileReader::moveJustReadFile() {
rappoccio marked this conversation as resolved.
Show resolved Hide resolved
if (streamerInputFile_.get() == nullptr)
return;
//delete the streamer input file:
Expand Down
15 changes: 10 additions & 5 deletions CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,35 @@ echo "LOCAL_TEST_DIR = $SCRAM_TEST_PATH"
RC=0

mkdir inDir
echo "test padding"
cmsRun ${SCRAM_TEST_PATH}/streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $?
cp teststreamfile.dat teststreamfile.padding
mv teststreamfile.dat inDir/
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py with padding" $?
rm -rf inDir

mkdir inDir

echo "test original"
cmsRun ${SCRAM_TEST_PATH}/streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $?
cp teststreamfile.dat teststreamfile.original
mv teststreamfile.dat inDir
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > in 2>&1 || die "cmsRun streamIn_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > in 2>&1 || die "cmsRun streamIn_cfg.py original" $?

echo "test original and alt"
rm watcherSourceToken
cp teststreamfile.original inDir/teststreamfile.dat
cmsRun ${SCRAM_TEST_PATH}/streamOutAlt_cfg.py > outAlt 2>&1 || die "cmsRun streamOutAlt_cfg.py" $?
mv teststreamfile_alt.dat inDir
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py >alt 2>&1 || die "cmsRun streamIn_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py --alt >alt 2>&1 || die "cmsRun streamIn_cfg.py alt" $?
#timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamInAlt_cfg.py > alt 2>&1 || die "cmsRun streamInAlt_cfg.py" $?

echo "test ext"
rm watcherSourceToken
cp teststreamfile.original inDir/teststreamfile.dat
cmsRun ${SCRAM_TEST_PATH}/streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $?
cmsRun ${SCRAM_TEST_PATH}/streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $?
mv teststreamfile_ext.dat inDir
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > ext 2>&1 || die "cmsRun streamIn_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py --ext > ext 2>&1 || die "cmsRun streamIn_cfg.py ext" $?
#timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamInExt_cfg.py > ext 2>&1 || die "cmsRun streamInExt_cfg.py" $?

# echo "CHECKSUM = 1" > out
Expand Down
36 changes: 35 additions & 1 deletion CalibCalorimetry/EcalLaserSorting/test/streamIn_cfg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
import FWCore.ParameterSet.Config as cms

import argparse
import sys

parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test streamer input')

parser.add_argument("--alt", help="Have filter succeed", action="store_true")
parser.add_argument("--ext", help="Switch the order of dependencies", action="store_true")

args = parser.parse_args()


process = cms.Process("TRANSFER")

import FWCore.Framework.test.cmsExceptionsFatal_cff
Expand All @@ -26,8 +37,31 @@
product_to_get = cms.string('m1')
)

ids = [cms.EventID(1,0,0), cms.EventID(1,1,0)]
for e in range(10123456789, 10123456839):
ids.append(cms.EventID(1,1,e))
if args.alt:
for e in range(15123456789, 15123456839):
ids.append(cms.EventID(1,1,e))

if args.ext:
ids.append(cms.EventID(1,1,0))
ids.append(cms.EventID(1,0,0))
ids.append(cms.EventID(1,0,0))
ids.append(cms.EventID(1,1,0))
for e in range(20123456789, 20123456839):
ids.append(cms.EventID(1,1,e))

ids.append(cms.EventID(1,1,0))
ids.append(cms.EventID(1,0,0))

process.check = cms.EDAnalyzer("RunLumiEventChecker",
eventSequence = cms.untracked.VEventID(ids)
)


process.out = cms.OutputModule("PoolOutputModule",
fileName = cms.untracked.string('myout.root')
)

process.end = cms.EndPath(process.a1*process.out)
process.end = cms.EndPath(process.a1*process.out*process.check)
49 changes: 42 additions & 7 deletions DQMServices/StreamerIO/plugins/DQMStreamerReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cctype>

namespace dqmservices {
using namespace edm::streamer;

DQMStreamerReader::DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
: StreamerInputSource(pset, desc),
Expand Down Expand Up @@ -81,17 +82,27 @@ namespace dqmservices {
fiterator_.logFileAction("DQMStreamerReader initialised.");
}

void DQMStreamerReader::setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent) {
deserializeAndMergeWithRegistry(msg, subsequent);
auto event = getEventMsg();
//file might be empty
if (not event)
return;
assert(event->isEventMetaData());
deserializeEventMetaData(*event);
updateEventMetaData();
}
void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
processedEventPerLs_ = 0;

std::string path = entry.get_data_path();

file_.lumi_ = entry;
file_.streamFile_ = std::make_unique<edm::StreamerInputFile>(path);
file_.streamFile_ = std::make_unique<StreamerInputFile>(path);

InitMsgView const* header = getHeaderMsg();
if (isFirstFile_) {
deserializeAndMergeWithRegistry(*header, false);
setupMetaData(*header, false);
}

// dump the list of HLT trigger name from the header
Expand Down Expand Up @@ -135,9 +146,14 @@ namespace dqmservices {
return;
}

if (artificialFileBoundary_) {
updateEventMetaData();
artificialFileBoundary_ = false;
return;
}
//Get header/init from reader
InitMsgView const* header = getHeaderMsg();
deserializeAndMergeWithRegistry(*header, true);
setupMetaData(*header, true);
}

bool DQMStreamerReader::openNextFileImp_() {
Expand Down Expand Up @@ -179,11 +195,11 @@ namespace dqmservices {

EventMsgView const* DQMStreamerReader::getEventMsg() {
auto next = file_.streamFile_->next();
if (edm::StreamerInputFile::Next::kFile == next) {
if (StreamerInputFile::Next::kFile == next) {
return nullptr;
}

if (edm::StreamerInputFile::Next::kStop == next) {
if (StreamerInputFile::Next::kStop == next) {
return nullptr;
}

Expand Down Expand Up @@ -285,6 +301,25 @@ namespace dqmservices {
// this means end of file, so close the file
closeFileImp_("eof");
} else {
//NOTE: at this point need to see if meta data checksum changed. If it did
// we need to issue a 'new File' transition
if (eview->isEventMetaData()) {
auto lastEventMetaData = presentEventMetaDataChecksum();
if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
deserializeEventMetaData(*eview);
artificialFileBoundary_ = true;
return nullptr;
} else {
//skipping
eview = getEventMsg();
assert((eview == nullptr) or (not eview->isEventMetaData()));
if (eview == nullptr) {
closeFileImp_("eof");
continue;
}
}
}

if (!acceptEvent(eview)) {
continue;
} else {
Expand All @@ -303,7 +338,7 @@ namespace dqmservices {
try {
EventMsgView const* eview = prepareNextEvent();
if (eview == nullptr) {
if (file_.streamFile_ and file_.streamFile_->newHeader()) {
if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
return Next::kFile;
}
return Next::kStop;
Expand Down Expand Up @@ -437,7 +472,7 @@ namespace dqmservices {
desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);

DQMFileIterator::fillDescription(desc);
edm::StreamerInputSource::fillDescription(desc);
StreamerInputSource::fillDescription(desc);
edm::EventSkipperByID::fillDescription(desc);

descriptions.add("source", desc);
Expand Down
21 changes: 10 additions & 11 deletions DQMServices/StreamerIO/plugins/DQMStreamerReader.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
#ifndef DQMServices_StreamerIO_DQMStreamerReader_h
#define DQMServices_StreamerIO_DQMStreamerReader_h

#include "FWCore/ServiceRegistry/interface/Service.h"
#include "IOPool/Streamer/interface/StreamerInputSource.h"
#include "IOPool/Streamer/interface/StreamerInputFile.h"
#include "IOPool/Streamer/interface/MsgTools.h"

#include "DQMFileIterator.h"
#include "DQMMonitoringService.h"
#include "TriggerSelector.h"

#include <memory>
Expand All @@ -16,7 +14,7 @@

namespace dqmservices {

class DQMStreamerReader : public edm::StreamerInputSource {
class DQMStreamerReader : public edm::streamer::StreamerInputSource {
public:
DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc);
~DQMStreamerReader() override;
Expand All @@ -41,14 +39,15 @@ namespace dqmservices {

bool openNextFileImp_();

InitMsgView const* getHeaderMsg();
EventMsgView const* getEventMsg();
edm::streamer::InitMsgView const* getHeaderMsg();
edm::streamer::EventMsgView const* getEventMsg();

EventMsgView const* prepareNextEvent();
void setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent);
edm::streamer::EventMsgView const* prepareNextEvent();

bool isFirstFile_ = true;
bool prepareNextFile();
bool acceptEvent(const EventMsgView*);
bool acceptEvent(const edm::streamer::EventMsgView*);

DQMFileIterator fiterator_;
unsigned int processedEventPerLs_ = 0;
Expand All @@ -65,8 +64,11 @@ namespace dqmservices {
bool matchTriggerSel_ = false;
bool setMatchTriggerSel(std::vector<std::string> const& tnames);

//If the event meta data changes while reading a file, we need to
// cause a file transition to happen to allow synchronous update
bool artificialFileBoundary_ = false;
struct OpenFile {
std::unique_ptr<edm::StreamerInputFile> streamFile_;
std::unique_ptr<edm::streamer::StreamerInputFile> streamFile_;
DQMFileIterator::LumiEntry lumi_;

bool open() { return (streamFile_.get() != nullptr); }
Expand All @@ -75,9 +77,6 @@ namespace dqmservices {

std::shared_ptr<edm::EventSkipperByID> eventSkipperByID_;
std::shared_ptr<TriggerSelector> triggerSelector_;

/* this is for monitoring */
edm::Service<DQMMonitoringService> mon_;
};

} // namespace dqmservices
Expand Down
Loading