From e3d9efe76dda6137dffb9d914315ce354741c653 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 24 Apr 2024 13:08:55 -0500 Subject: [PATCH] Moved event meta-data storage in streamer files The event meta-data (e.g. BranchLists) is now added as an EventMsg before the following EventMsg for which it applies. This added EventMsg contains no data products. When seen, the added EventMsg is handled as an artificial file boundary in order to cause the needed stream synchronization. This change is needed to handle how HLT merges parts of streamer files which might have different meta-data. --- .../interface/WatcherSource.h | 2 +- .../interface/WatcherStreamFileReader.h | 15 +++--- .../src/WatcherStreamFileReader.cc | 26 ++++++----- .../EcalLaserSorting/test/RunStreamer.sh | 15 ++++-- .../EcalLaserSorting/test/streamIn_cfg.py | 36 ++++++++++++++- .../StreamerIO/plugins/DQMStreamerReader.cc | 36 +++++++++++++-- .../StreamerIO/plugins/DQMStreamerReader.h | 9 ++-- .../Streamer/interface/StreamedProducts.h | 27 +++++++---- DataFormats/Streamer/src/classes_def.xml | 6 ++- .../plugins/GlobalEvFOutputModule.cc | 14 ++++-- IOPool/Streamer/interface/StreamSerializer.h | 24 +++++++--- .../Streamer/interface/StreamerInputModule.h | 46 +++++++++++++++---- .../Streamer/interface/StreamerInputSource.h | 20 ++++++-- .../interface/StreamerOutputModuleBase.h | 1 + .../interface/StreamerOutputModuleCommon.h | 14 +++++- IOPool/Streamer/src/StreamSerializer.cc | 43 ++++++++++++----- IOPool/Streamer/src/StreamerFileReader.cc | 43 +++++++++++++++-- IOPool/Streamer/src/StreamerFileReader.h | 2 + IOPool/Streamer/src/StreamerInputSource.cc | 45 +++++++++++++----- .../Streamer/src/StreamerOutputModuleBase.cc | 12 +++-- .../src/StreamerOutputModuleCommon.cc | 46 +++++++++++++------ ...RefProductIDMetadataConsistencyStreamer.sh | 2 +- 22 files changed, 367 insertions(+), 117 deletions(-) diff --git a/CalibCalorimetry/EcalLaserSorting/interface/WatcherSource.h b/CalibCalorimetry/EcalLaserSorting/interface/WatcherSource.h index 45cbeb8190ef7..791a830305b3f 100644 --- a/CalibCalorimetry/EcalLaserSorting/interface/WatcherSource.h +++ b/CalibCalorimetry/EcalLaserSorting/interface/WatcherSource.h @@ -4,6 +4,6 @@ #include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h" #include "IOPool/Streamer/interface/StreamerInputModule.h" -typedef edm::StreamerInputModule WatcherSource; +typedef edm::streamer::StreamerInputModule WatcherSource; #endif //WatcherSourceModule_H not defined diff --git a/CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h b/CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h index ce77f03ec9d82..a4f7870a72c7f 100644 --- a/CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h +++ b/CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h @@ -1,5 +1,5 @@ -#ifndef IOPool_Streamer_StreamerFileReader_h -#define IOPool_Streamer_StreamerFileReader_h +#ifndef CalibCalorimetry_EcalLaserSorting_WatcherStreamFileReader_h +#define CalibCalorimetry_EcalLaserSorting_WatcherStreamFileReader_h #include "IOPool/Streamer/interface/InitMessage.h" #include "IOPool/Streamer/interface/EventMessage.h" @@ -19,7 +19,7 @@ * This protection is obviously not full proof, especially to transfer lag. */ -namespace edm { +namespace edm::streamer { class StreamerInputFile; } @@ -28,15 +28,16 @@ class WatcherStreamFileReader { WatcherStreamFileReader(edm::ParameterSet const& pset); ~WatcherStreamFileReader(); - const InitMsgView* getHeader(); - const EventMsgView* getNextEvent(); + const edm::streamer::InitMsgView* getHeader(); + const edm::streamer::EventMsgView* getNextEvent(); const bool newHeader(); - edm::StreamerInputFile* getInputFile(); + edm::streamer::StreamerInputFile* getInputFile(); void closeFile(); private: + void moveJustReadFile(); /** Directory to look for streamer files */ std::string inputDir_; @@ -59,7 +60,7 @@ class WatcherStreamFileReader { /** Cached input file stream */ - std::unique_ptr streamerInputFile_; + std::unique_ptr streamerInputFile_; std::string fileName_; diff --git a/CalibCalorimetry/EcalLaserSorting/src/WatcherStreamFileReader.cc b/CalibCalorimetry/EcalLaserSorting/src/WatcherStreamFileReader.cc index 262ebe485333e..0971d5a342907 100644 --- a/CalibCalorimetry/EcalLaserSorting/src/WatcherStreamFileReader.cc +++ b/CalibCalorimetry/EcalLaserSorting/src/WatcherStreamFileReader.cc @@ -19,6 +19,7 @@ //using namespace edm; using namespace std; +using namespace edm::streamer; //std::string WatcherStreamFileReader::fileName_; @@ -154,12 +155,12 @@ WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset) WatcherStreamFileReader::~WatcherStreamFileReader() {} const bool WatcherStreamFileReader::newHeader() { - edm::StreamerInputFile* inputFile = getInputFile(); - return inputFile ? inputFile->newHeader() : false; + StreamerInputFile* inputFile = getInputFile(); + return inputFile; } const InitMsgView* WatcherStreamFileReader::getHeader() { - edm::StreamerInputFile* inputFile = getInputFile(); + StreamerInputFile* inputFile = getInputFile(); //TODO: shall better send an exception... if (inputFile == nullptr) { @@ -177,21 +178,20 @@ const InitMsgView* WatcherStreamFileReader::getHeader() { const EventMsgView* WatcherStreamFileReader::getNextEvent() { if (end_) { - closeFile(); + moveJustReadFile(); return nullptr; } - edm::StreamerInputFile* inputFile; - - //go to next input file, till no new event is found - while ((inputFile = getInputFile()) != nullptr && inputFile->next() != edm::StreamerInputFile::Next::kEvent) { - closeFile(); + StreamerInputFile* inputFile; + if ((inputFile = getInputFile()) != nullptr and inputFile->next() == StreamerInputFile::Next::kStop) { + moveJustReadFile(); + return nullptr; } return inputFile == nullptr ? nullptr : inputFile->currentRecord(); } -edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() { +StreamerInputFile* WatcherStreamFileReader::getInputFile() { char* lineptr = nullptr; size_t n = 0; static stringstream cmd; @@ -379,7 +379,7 @@ edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() { cout << "[WatcherSource " << now() << "]" << " Opening file " << fileName_ << "\n" << flush; - streamerInputFile_ = std::make_unique(fileName_); + streamerInputFile_ = std::make_unique(fileName_); ofstream f(".watcherfile"); f << fileName_; @@ -392,7 +392,9 @@ edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() { return streamerInputFile_.get(); } -void WatcherStreamFileReader::closeFile() { +void WatcherStreamFileReader::closeFile() {} + +void WatcherStreamFileReader::moveJustReadFile() { if (streamerInputFile_.get() == nullptr) return; //delete the streamer input file: diff --git a/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh b/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh index f404bfdc23298..0dd340c27dbc7 100755 --- a/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh +++ b/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh @@ -10,30 +10,35 @@ echo "LOCAL_TEST_DIR = $SCRAM_TEST_PATH" RC=0 mkdir inDir +echo "test padding" cmsRun ${SCRAM_TEST_PATH}/streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $? cp teststreamfile.dat teststreamfile.padding mv teststreamfile.dat inDir/ -timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $? +timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py with padding" $? rm -rf inDir mkdir inDir + +echo "test original" cmsRun ${SCRAM_TEST_PATH}/streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $? cp teststreamfile.dat teststreamfile.original mv teststreamfile.dat inDir -timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > in 2>&1 || die "cmsRun streamIn_cfg.py" $? +timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > in 2>&1 || die "cmsRun streamIn_cfg.py original" $? +echo "test original and alt" rm watcherSourceToken cp teststreamfile.original inDir/teststreamfile.dat cmsRun ${SCRAM_TEST_PATH}/streamOutAlt_cfg.py > outAlt 2>&1 || die "cmsRun streamOutAlt_cfg.py" $? mv teststreamfile_alt.dat inDir -timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py >alt 2>&1 || die "cmsRun streamIn_cfg.py" $? +timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py --alt >alt 2>&1 || die "cmsRun streamIn_cfg.py alt" $? #timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamInAlt_cfg.py > alt 2>&1 || die "cmsRun streamInAlt_cfg.py" $? +echo "test ext" rm watcherSourceToken cp teststreamfile.original inDir/teststreamfile.dat -cmsRun ${SCRAM_TEST_PATH}/streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $? +cmsRun ${SCRAM_TEST_PATH}/streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $? mv teststreamfile_ext.dat inDir -timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py > ext 2>&1 || die "cmsRun streamIn_cfg.py" $? +timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamIn_cfg.py --ext > ext 2>&1 || die "cmsRun streamIn_cfg.py ext" $? #timeout --signal SIGTERM 180 cmsRun ${SCRAM_TEST_PATH}/streamInExt_cfg.py > ext 2>&1 || die "cmsRun streamInExt_cfg.py" $? # echo "CHECKSUM = 1" > out diff --git a/CalibCalorimetry/EcalLaserSorting/test/streamIn_cfg.py b/CalibCalorimetry/EcalLaserSorting/test/streamIn_cfg.py index 03a986f7a1052..ec6d8e24671c7 100644 --- a/CalibCalorimetry/EcalLaserSorting/test/streamIn_cfg.py +++ b/CalibCalorimetry/EcalLaserSorting/test/streamIn_cfg.py @@ -1,5 +1,16 @@ import FWCore.ParameterSet.Config as cms +import argparse +import sys + +parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test streamer input') + +parser.add_argument("--alt", help="Have filter succeed", action="store_true") +parser.add_argument("--ext", help="Switch the order of dependencies", action="store_true") + +args = parser.parse_args() + + process = cms.Process("TRANSFER") import FWCore.Framework.test.cmsExceptionsFatal_cff @@ -26,8 +37,31 @@ product_to_get = cms.string('m1') ) +ids = [cms.EventID(1,0,0), cms.EventID(1,1,0)] +for e in range(10123456789, 10123456839): + ids.append(cms.EventID(1,1,e)) +if args.alt: + for e in range(15123456789, 15123456839): + ids.append(cms.EventID(1,1,e)) + +if args.ext: + ids.append(cms.EventID(1,1,0)) + ids.append(cms.EventID(1,0,0)) + ids.append(cms.EventID(1,0,0)) + ids.append(cms.EventID(1,1,0)) + for e in range(20123456789, 20123456839): + ids.append(cms.EventID(1,1,e)) + +ids.append(cms.EventID(1,1,0)) +ids.append(cms.EventID(1,0,0)) + +process.check = cms.EDAnalyzer("RunLumiEventChecker", + eventSequence = cms.untracked.VEventID(ids) +) + + process.out = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string('myout.root') ) -process.end = cms.EndPath(process.a1*process.out) +process.end = cms.EndPath(process.a1*process.out*process.check) diff --git a/DQMServices/StreamerIO/plugins/DQMStreamerReader.cc b/DQMServices/StreamerIO/plugins/DQMStreamerReader.cc index 97aef9ff623b0..08584f6ee3d1d 100644 --- a/DQMServices/StreamerIO/plugins/DQMStreamerReader.cc +++ b/DQMServices/StreamerIO/plugins/DQMStreamerReader.cc @@ -82,6 +82,13 @@ namespace dqmservices { fiterator_.logFileAction("DQMStreamerReader initialised."); } + void DQMStreamerReader::setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent) { + deserializeAndMergeWithRegistry(msg, subsequent); + auto event = getEventMsg(); + assert(event and isEventMetaData(*event)); + deserializeEventMetaData(*event); + updateEventMetaData(); + } void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) { processedEventPerLs_ = 0; @@ -92,7 +99,7 @@ namespace dqmservices { InitMsgView const* header = getHeaderMsg(); if (isFirstFile_) { - deserializeAndMergeWithRegistry(*header, false); + setupMetaData(*header, false); } // dump the list of HLT trigger name from the header @@ -136,9 +143,14 @@ namespace dqmservices { return; } + if (artificialFileBoundary_) { + updateEventMetaData(); + artificialFileBoundary_ = false; + return; + } //Get header/init from reader InitMsgView const* header = getHeaderMsg(); - deserializeAndMergeWithRegistry(*header, true); + setupMetaData(*header, true); } bool DQMStreamerReader::openNextFileImp_() { @@ -286,6 +298,24 @@ namespace dqmservices { // this means end of file, so close the file closeFileImp_("eof"); } else { + //NOTE: at this point need to see if meta data checksum changed. If it did + // we need to issue a 'new File' transition + if (isEventMetaData(*eview)) { + auto lastEventMetaData = presentEventMetaDataChecksum(); + if (eventMetaDataChecksum(*eview) != lastEventMetaData) { + deserializeEventMetaData(*eview); + artificialFileBoundary_ = true; + return nullptr; + } else { + //skipping + eview = getEventMsg(); + if (eview == nullptr) { + closeFileImp_("eof"); + continue; + } + } + } + if (!acceptEvent(eview)) { continue; } else { @@ -304,7 +334,7 @@ namespace dqmservices { try { EventMsgView const* eview = prepareNextEvent(); if (eview == nullptr) { - if (file_.streamFile_ and file_.streamFile_->newHeader()) { + if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) { return Next::kFile; } return Next::kStop; diff --git a/DQMServices/StreamerIO/plugins/DQMStreamerReader.h b/DQMServices/StreamerIO/plugins/DQMStreamerReader.h index 547f1f8cbfa17..bf13f51784fb1 100644 --- a/DQMServices/StreamerIO/plugins/DQMStreamerReader.h +++ b/DQMServices/StreamerIO/plugins/DQMStreamerReader.h @@ -1,13 +1,11 @@ #ifndef DQMServices_StreamerIO_DQMStreamerReader_h #define DQMServices_StreamerIO_DQMStreamerReader_h -#include "FWCore/ServiceRegistry/interface/Service.h" #include "IOPool/Streamer/interface/StreamerInputSource.h" #include "IOPool/Streamer/interface/StreamerInputFile.h" #include "IOPool/Streamer/interface/MsgTools.h" #include "DQMFileIterator.h" -#include "DQMMonitoringService.h" #include "TriggerSelector.h" #include @@ -44,6 +42,7 @@ namespace dqmservices { edm::streamer::InitMsgView const* getHeaderMsg(); edm::streamer::EventMsgView const* getEventMsg(); + void setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent); edm::streamer::EventMsgView const* prepareNextEvent(); bool isFirstFile_ = true; @@ -65,6 +64,9 @@ namespace dqmservices { bool matchTriggerSel_ = false; bool setMatchTriggerSel(std::vector const& tnames); + //If the event meta data changes while reading a file, we need to + // cause a file transition to happen to allow synchronous update + bool artificialFileBoundary_ = false; struct OpenFile { std::unique_ptr streamFile_; DQMFileIterator::LumiEntry lumi_; @@ -75,9 +77,6 @@ namespace dqmservices { std::shared_ptr eventSkipperByID_; std::shared_ptr triggerSelector_; - - /* this is for monitoring */ - edm::Service mon_; }; } // namespace dqmservices diff --git a/DataFormats/Streamer/interface/StreamedProducts.h b/DataFormats/Streamer/interface/StreamedProducts.h index 2bc6e2a2d64e4..926b99f16ac00 100644 --- a/DataFormats/Streamer/interface/StreamedProducts.h +++ b/DataFormats/Streamer/interface/StreamedProducts.h @@ -74,17 +74,29 @@ namespace edm { SendEvent(EventAuxiliary const& aux, ProcessHistory const& processHistory, EventSelectionIDVector const& eventSelectionIDs, - BranchListIndexes const& branchListIndexes) + BranchListIndexes const& branchListIndexes, + BranchIDLists const& branchIDLists, + ThinnedAssociationsHelper const& thinnedAssociationsHelper, + uint32_t metaDataChecksum) : aux_(aux), processHistory_(processHistory), eventSelectionIDs_(eventSelectionIDs), branchListIndexes_(branchListIndexes), - products_() {} + branchIDLists_(branchIDLists), + thinnedAssociationsHelper_(thinnedAssociationsHelper), + products_(), + metaDataChecksum_(metaDataChecksum) {} EventAuxiliary const& aux() const { return aux_; } SendProds const& products() const { return products_; } ProcessHistory const& processHistory() const { return processHistory_; } EventSelectionIDVector const& eventSelectionIDs() const { return eventSelectionIDs_; } BranchListIndexes const& branchListIndexes() const { return branchListIndexes_; } + //This will only hold values for EventMetaData messages + BranchIDLists const& branchIDLists() const { return branchIDLists_; } + //This will only hold values for EventMetaData messages + ThinnedAssociationsHelper const& thinnedAssociationsHelper() const { return thinnedAssociationsHelper_; } + //This is the adler32 checksum of the EventMetaData associated with this Event + uint32_t metaDataChecksum() const { return metaDataChecksum_; } SendProds& products() { return products_; } private: @@ -92,7 +104,10 @@ namespace edm { ProcessHistory processHistory_; EventSelectionIDVector eventSelectionIDs_; BranchListIndexes branchListIndexes_; + BranchIDLists branchIDLists_; + ThinnedAssociationsHelper thinnedAssociationsHelper_; SendProds products_; + uint32_t metaDataChecksum_; // other tables necessary for provenance lookup }; @@ -105,21 +120,13 @@ namespace edm { SendJobHeader() {} SendDescs const& descs() const { return descs_; } ParameterSetMap const& processParameterSet() const { return processParameterSet_; } - BranchIDLists const& branchIDLists() const { return branchIDLists_; } - ThinnedAssociationsHelper const& thinnedAssociationsHelper() const { return thinnedAssociationsHelper_; } void push_back(BranchDescription const& bd) { descs_.push_back(bd); } void setParameterSetMap(ParameterSetMap const& psetMap) { processParameterSet_ = psetMap; } - void setBranchIDLists(BranchIDLists const& bidlists) { branchIDLists_ = bidlists; } - void setThinnedAssociationsHelper(ThinnedAssociationsHelper const& v) { thinnedAssociationsHelper_ = v; } void initializeTransients(); private: SendDescs descs_; ParameterSetMap processParameterSet_; - BranchIDLists branchIDLists_; - ThinnedAssociationsHelper thinnedAssociationsHelper_; - // trigger bit descriptions will be added here and permanent - // provenance values }; } // namespace edm diff --git a/DataFormats/Streamer/src/classes_def.xml b/DataFormats/Streamer/src/classes_def.xml index 9451393a99d33..e0b30db038f98 100644 --- a/DataFormats/Streamer/src/classes_def.xml +++ b/DataFormats/Streamer/src/classes_def.xml @@ -7,10 +7,12 @@ - + + - + + diff --git a/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc b/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc index 6bbf40317ebae..76e51ce04ef26 100644 --- a/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc +++ b/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc @@ -359,14 +359,11 @@ namespace evf { StreamerOutputFile stream_writer_preamble(openIniFileName); uint32 preamble_adler32 = 1; - edm::BranchIDLists const* bidlPtr = branchIDLists(); auto psetMapHandle = run.getHandle(psetToken_); std::unique_ptr init_message = streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(), - *bidlPtr, - *thinnedAssociationsHelper(), OutputModule::processName(), description().moduleLabel(), moduleDescription().mainParameterSetID(), @@ -430,7 +427,16 @@ namespace evf { edm::LuminosityBlockForOutput const& iLB) const { auto openDatFilePath = edm::Service()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_); - return std::make_shared(openDatFilePath, iLB.luminosityBlock()); + auto ret = std::make_shared(openDatFilePath, iLB.luminosityBlock()); + + StreamerOutputModuleCommon streamerCommon( + commonParameters_, &keptProducts()[edm::InEvent], description().moduleLabel()); + + auto msg = streamerCommon.serializeEventMetaData( + *streamerCommon.getSerializerBuffer(), *branchIDLists(), *thinnedAssociationsHelper()); + + ret->doOutputEvent(*msg); + return ret; } void GlobalEvFOutputModule::acquire(edm::StreamID id, diff --git a/IOPool/Streamer/interface/StreamSerializer.h b/IOPool/Streamer/interface/StreamSerializer.h index 2169b0cb675c8..14b85e366b06c 100644 --- a/IOPool/Streamer/interface/StreamSerializer.h +++ b/IOPool/Streamer/interface/StreamSerializer.h @@ -75,22 +75,26 @@ namespace edm::streamer { public: StreamSerializer(SelectedProducts const *selections); - int serializeRegistry(SerializeDataBuffer &data_buffer, - const BranchIDLists &branchIDLists, - ThinnedAssociationsHelper const &thinnedAssociationsHelper); + int serializeRegistry(SerializeDataBuffer &data_buffer); - int serializeRegistry(SerializeDataBuffer &data_buffer, - const BranchIDLists &branchIDLists, - ThinnedAssociationsHelper const &thinnedAssociationsHelper, - SendJobHeader::ParameterSetMap const &psetMap); + int serializeRegistry(SerializeDataBuffer &data_buffer, SendJobHeader::ParameterSetMap const &psetMap); int serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, + uint32_t metaDataChecksum, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const; + ///data_buffer.adler32_chksum_ is the meta data checksum to pass to subsequent events + int serializeEventMetaData(SerializeDataBuffer &data_buffer, + const BranchIDLists &branchIDLists, + ThinnedAssociationsHelper const &thinnedAssociationsHelper, + StreamerCompressionAlgo compressionAlgo, + int compression_level, + unsigned int reserveSize) const; + /** * Compresses the data in the specified input buffer into the * specified output buffer. Returns the size of the compressed data @@ -117,6 +121,12 @@ namespace edm::streamer { bool addHeader = true); private: + int serializeEventCommon(SerializeDataBuffer &data_buffer, + edm::SendEvent const &iEvent, + StreamerCompressionAlgo compressionAlgo, + int compression_level, + unsigned int reserveSize) const; + SelectedProducts const *selections_; edm::propagate_const tc_; }; diff --git a/IOPool/Streamer/interface/StreamerInputModule.h b/IOPool/Streamer/interface/StreamerInputModule.h index 360ee89c66624..597c25491ac16 100644 --- a/IOPool/Streamer/interface/StreamerInputModule.h +++ b/IOPool/Streamer/interface/StreamerInputModule.h @@ -30,24 +30,49 @@ namespace edm::streamer { private: void genuineCloseFile() override { + if (didArtificialFile_) { + didArtificialFile_ = false; + + return; + } if (pr_.get() != nullptr) pr_->closeFile(); } + void setupMetaData() { + InitMsgView const* header = pr_->getHeader(); + assert(header); + deserializeAndMergeWithRegistry(*header); + + //NOTE: should read first Event to get the meta data + auto eview = pr_->getNextEvent(); + assert(eview); + assert(isEventMetaData(*eview)); + deserializeEventMetaData(*eview); + updateEventMetaData(); + } + void genuineReadFile() override { if (isFirstFile_) { isFirstFile_ = false; return; } - InitMsgView const* header = pr_->getHeader(); - deserializeAndMergeWithRegistry(*header); + if (didArtificialFile_) { + //update the event meta data + didArtificialFile_ = false; + updateEventMetaData(); + + return; + } + setupMetaData(); } Next checkNext() override; edm::propagate_const> pr_; bool isFirstFile_ = true; + bool didArtificialFile_ = false; }; //end-of-class-def template @@ -59,21 +84,26 @@ namespace edm::streamer { //prod_reg_(&productRegistry()), pr_(new Producer(pset)) { //Get header/init from Producer - InitMsgView const* header = pr_->getHeader(); - deserializeAndMergeWithRegistry(*header); + setupMetaData(); } template StreamerInputSource::Next StreamerInputModule::checkNext() { EventMsgView const* eview = pr_->getNextEvent(); - if (pr_->newHeader()) { - FDEBUG(6) << "A new file has been opened and we must compare Headers here !!" << std::endl; - return Next::kFile; - } if (eview == nullptr) { + if (pr_->newHeader()) { + FDEBUG(6) << "A new file has been opened and we must compare Headers here !!" << std::endl; + return Next::kFile; + } return Next::kStop; } + if (isEventMetaData(*eview)) { + //we lie and say there is a new file since we need to synchronize to update the meta data + deserializeEventMetaData(*eview); + didArtificialFile_ = true; + return Next::kFile; + } deserializeEvent(*eview); return Next::kEvent; } diff --git a/IOPool/Streamer/interface/StreamerInputSource.h b/IOPool/Streamer/interface/StreamerInputSource.h index 2d7354b94b2e5..e81d621ac133b 100644 --- a/IOPool/Streamer/interface/StreamerInputSource.h +++ b/IOPool/Streamer/interface/StreamerInputSource.h @@ -40,13 +40,20 @@ namespace edm::streamer { void deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent = false); + //Checks to see if eventView is a regular event (returns false) or a meta data event (true) + bool isEventMetaData(EventMsgView const& eventView) const; + //If eventView is a meta data event then this returns its checksum + uint32_t eventMetaDataChecksum(EventMsgView const& eventView) const; + //Should be called right after this message has been read + void deserializeEventMetaData(EventMsgView const& eventView); void deserializeEvent(EventMsgView const& eventView); - static void mergeIntoRegistry(SendJobHeader const& header, - ProductRegistry&, - BranchIDListHelper&, - ThinnedAssociationsHelper&, - bool subsequent); + uint32_t presentEventMetaDataChecksum() const { return eventMetaDataChecksum_; } + //This can only be called during a new file transition as it updates state that requires + // framework synchronization. + void updateEventMetaData(); + + static void mergeIntoRegistry(SendJobHeader const& header, ProductRegistry&, bool subsequent); /** * Detect if buffer starts with "XZ\0" which means it is compressed in LZMA format @@ -89,6 +96,8 @@ namespace edm::streamer { void resetAfterEndRun(); private: + void deserializeEventCommon(EventMsgView const& eventView, bool isMetaData); + class EventPrincipalHolder : public EDProductGetter { public: EventPrincipalHolder(); @@ -125,6 +134,7 @@ namespace edm::streamer { std::string processName_; unsigned int protocolVersion_; + uint32_t eventMetaDataChecksum_ = 0; }; //end-of-class-def } // namespace edm::streamer diff --git a/IOPool/Streamer/interface/StreamerOutputModuleBase.h b/IOPool/Streamer/interface/StreamerOutputModuleBase.h index 6c88edecfbe26..eedade18f4ac0 100644 --- a/IOPool/Streamer/interface/StreamerOutputModuleBase.h +++ b/IOPool/Streamer/interface/StreamerOutputModuleBase.h @@ -45,6 +45,7 @@ namespace edm { private: edm::EDGetTokenT trToken_; edm::EDGetTokenT psetToken_; + bool lastCallWasBeginRun_ = false; }; //end-of-class-def } // namespace streamer diff --git a/IOPool/Streamer/interface/StreamerOutputModuleCommon.h b/IOPool/Streamer/interface/StreamerOutputModuleCommon.h index 165248c08b75e..a0b3f4e35338d 100644 --- a/IOPool/Streamer/interface/StreamerOutputModuleCommon.h +++ b/IOPool/Streamer/interface/StreamerOutputModuleCommon.h @@ -45,13 +45,15 @@ namespace edm { static void fillDescription(ParameterSetDescription& desc); std::unique_ptr serializeRegistry(SerializeDataBuffer& sbuf, - BranchIDLists const& branchLists, - ThinnedAssociationsHelper const& helper, std::string const& processName, std::string const& moduleLabel, ParameterSetID const& toplevel, SendJobHeader::ParameterSetMap const* psetMap); + std::unique_ptr serializeEventMetaData(SerializeDataBuffer& sbuf, + BranchIDLists const& branchLists, + ThinnedAssociationsHelper const& helper); + std::unique_ptr serializeEvent(SerializeDataBuffer& sbuf, EventForOutput const& e, Handle const& triggerResults, @@ -63,6 +65,13 @@ namespace edm { std::unique_ptr serializerBuffer_; private: + std::unique_ptr serializeEventCommon(uint32 run, + uint32 lumi, + uint64 event, + std::vector hltbits, + unsigned int hltsize, + SerializeDataBuffer& sbuf); + void setHltMask(EventForOutput const& e, Handle const& triggerResults, std::vector& hltbits) const; @@ -86,6 +95,7 @@ namespace edm { Strings hltTriggerSelections_; uint32 outputModuleId_; + uint32_t eventMetaDataChecksum_ = 0; }; //end-of-class-def } // namespace streamer } // namespace edm diff --git a/IOPool/Streamer/src/StreamSerializer.cc b/IOPool/Streamer/src/StreamSerializer.cc index 0ac0993a1c897..545934e200116 100644 --- a/IOPool/Streamer/src/StreamSerializer.cc +++ b/IOPool/Streamer/src/StreamSerializer.cc @@ -42,17 +42,13 @@ namespace edm::streamer { * Serializes the product registry (that was specified to the constructor) * into the specified InitMessage. */ - int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer, - const BranchIDLists &branchIDLists, - ThinnedAssociationsHelper const &thinnedAssociationsHelper) { + int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer) { SendJobHeader::ParameterSetMap psetMap; pset::Registry::instance()->fillMap(psetMap); - return serializeRegistry(data_buffer, branchIDLists, thinnedAssociationsHelper, psetMap); + return serializeRegistry(data_buffer, psetMap); } int StreamSerializer::serializeRegistry(SerializeDataBuffer &data_buffer, - const BranchIDLists &branchIDLists, - ThinnedAssociationsHelper const &thinnedAssociationsHelper, SendJobHeader::ParameterSetMap const &psetMap) { FDEBUG(6) << "StreamSerializer::serializeRegistry" << std::endl; SendJobHeader sd; @@ -64,8 +60,6 @@ namespace edm::streamer { FDEBUG(9) << "StreamOutput got product = " << selection.first->className() << std::endl; } Service reg; - sd.setBranchIDLists(branchIDLists); - sd.setThinnedAssociationsHelper(thinnedAssociationsHelper); sd.setParameterSetMap(psetMap); data_buffer.rootbuf_.Reset(); @@ -132,12 +126,19 @@ namespace edm::streamer { int StreamSerializer::serializeEvent(SerializeDataBuffer &data_buffer, EventForOutput const &event, ParameterSetID const &selectorConfig, + uint32_t metaDataChecksum, StreamerCompressionAlgo compressionAlgo, int compression_level, unsigned int reserveSize) const { EventSelectionIDVector selectionIDs = event.eventSelectionIDs(); selectionIDs.push_back(selectorConfig); - SendEvent se(event.eventAuxiliary(), event.processHistory(), selectionIDs, event.branchListIndexes()); + SendEvent se(event.eventAuxiliary(), + event.processHistory(), + selectionIDs, + event.branchListIndexes(), + {}, + {}, + metaDataChecksum); // Loop over EDProducts, fill the provenance, and write. @@ -172,7 +173,25 @@ namespace edm::streamer { } } } + return serializeEventCommon(data_buffer, se, compressionAlgo, compression_level, reserveSize); + } + + int StreamSerializer::serializeEventMetaData(SerializeDataBuffer &data_buffer, + const BranchIDLists &branchIDLists, + ThinnedAssociationsHelper const &thinnedAssociationsHelper, + StreamerCompressionAlgo compressionAlgo, + int compression_level, + unsigned int reserveSize) const { + SendEvent se({}, {}, {}, {}, branchIDLists, thinnedAssociationsHelper, 0); + + return serializeEventCommon(data_buffer, se, compressionAlgo, compression_level, reserveSize); + } + int StreamSerializer::serializeEventCommon(SerializeDataBuffer &data_buffer, + edm::SendEvent const &se, + StreamerCompressionAlgo compressionAlgo, + int compression_level, + unsigned int reserveSize) const { data_buffer.rootbuf_.Reset(); RootDebug tracer(10, 10); @@ -182,7 +201,7 @@ namespace edm::streamer { case 0: // failure { throw cms::Exception("StreamTranslation", "Event serialization failed") - << "StreamSerializer failed to serialize event: " << event.id(); + << "StreamSerializer failed to serialize event: " << se.aux().id(); break; } case 1: // succcess @@ -191,14 +210,14 @@ namespace edm::streamer { { throw cms::Exception("StreamTranslation", "Event serialization truncated") << "StreamSerializer module attempted to serialize an event\n" - << "that is to big for the allocated buffers: " << event.id(); + << "that is to big for the allocated buffers: " << se.aux().id(); break; } default: // unknown { throw cms::Exception("StreamTranslation", "Event serialization failed") << "StreamSerializer module got an unknown error code\n" - << " while attempting to serialize event: " << event.id(); + << " while attempting to serialize event: " << se.aux().id(); break; } } diff --git a/IOPool/Streamer/src/StreamerFileReader.cc b/IOPool/Streamer/src/StreamerFileReader.cc index 32f58476f7002..258da67c2e78e 100644 --- a/IOPool/Streamer/src/StreamerFileReader.cc +++ b/IOPool/Streamer/src/StreamerFileReader.cc @@ -10,6 +10,7 @@ #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" #include "FWCore/Sources/interface/EventSkipperByID.h" +#include namespace edm::streamer { StreamerFileReader::StreamerFileReader(ParameterSet const& pset, InputSourceDescription const& desc) @@ -39,13 +40,22 @@ namespace edm::streamer { << "No fileNames were specified\n"; } isFirstFile_ = true; - InitMsgView const* header = getHeader(); - deserializeAndMergeWithRegistry(*header, false); + updateMetaData(false); if (initialNumberOfEventsToSkip_) { skip(initialNumberOfEventsToSkip_); } } + void StreamerFileReader::updateMetaData(bool subsequent) { + InitMsgView const* header = getHeader(); + deserializeAndMergeWithRegistry(*header, subsequent); + //NOTE: should read first Event to get the meta data and then set 'artificial file' + auto eview = getNextEvent(); + assert(eview and isEventMetaData(*eview)); + deserializeEventMetaData(*eview); + updateEventMetaData(); + } + StreamerFileReader::Next StreamerFileReader::checkNext() { EventMsgView const* eview = getNextEvent(); @@ -55,6 +65,23 @@ namespace edm::streamer { } return Next::kStop; } + if (isEventMetaData(*eview)) { + if (presentEventMetaDataChecksum() != eventMetaDataChecksum(*eview)) { + //we lie and say there is a new file since we need to synchronize to update the meta data + didArtificialFile_ = true; + deserializeEventMetaData(*eview); + return Next::kFile; + } else { + //skip this meta data + eview = getNextEvent(); + if (eview == nullptr) { + if (newHeader()) { + return Next::kFile; + } + return Next::kStop; + } + } + } deserializeEvent(*eview); return Next::kEvent; } @@ -73,6 +100,9 @@ namespace edm::streamer { } void StreamerFileReader::genuineCloseFile() { + if (didArtificialFile_) { + return; + } if (streamReader_.get() != nullptr) streamReader_->closeStreamerFile(); } @@ -83,12 +113,17 @@ namespace edm::streamer { isFirstFile_ = false; return; } + if (didArtificialFile_) { + //update the event meta data + didArtificialFile_ = false; + updateEventMetaData(); + return; + } streamReader_->openNextFile(); // FDEBUG(6) << "A new file has been opened and we must compare Headers here !!" << std::endl; // A new file has been opened and we must compare Heraders here !! //Get header/init from reader - InitMsgView const* header = getHeader(); - deserializeAndMergeWithRegistry(*header, true); + updateMetaData(true); } bool StreamerFileReader::newHeader() { return streamReader_->newHeader(); } diff --git a/IOPool/Streamer/src/StreamerFileReader.h b/IOPool/Streamer/src/StreamerFileReader.h index 3661a6440535e..0d734e458082e 100644 --- a/IOPool/Streamer/src/StreamerFileReader.h +++ b/IOPool/Streamer/src/StreamerFileReader.h @@ -31,6 +31,7 @@ namespace edm { InitMsgView const* getHeader(); EventMsgView const* getNextEvent(); bool newHeader(); + void updateMetaData(bool subsequent); Next checkNext() override; void skip(int toSkip) override; @@ -49,6 +50,7 @@ namespace edm { int initialNumberOfEventsToSkip_; int prefetchMBytes_; bool isFirstFile_ = true; + bool didArtificialFile_ = false; }; } // namespace streamer } // namespace edm diff --git a/IOPool/Streamer/src/StreamerInputSource.cc b/IOPool/Streamer/src/StreamerInputSource.cc index 88f75037ec0d4..844e485833be6 100644 --- a/IOPool/Streamer/src/StreamerInputSource.cc +++ b/IOPool/Streamer/src/StreamerInputSource.cc @@ -56,11 +56,7 @@ namespace edm::streamer { StreamerInputSource::~StreamerInputSource() {} // --------------------------------------- - void StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, - ProductRegistry& reg, - BranchIDListHelper& branchIDListHelper, - ThinnedAssociationsHelper& thinnedHelper, - bool subsequent) { + void StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, bool subsequent) { SendDescs const& descs = header.descs(); FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl; @@ -72,8 +68,6 @@ namespace edm::streamer { if (!mergeInfo.empty()) { throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo; } - branchIDListHelper.updateFromInput(header.branchIDLists()); - thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper()); } else { declareStreamers(descs); buildClassCache(descs); @@ -81,8 +75,6 @@ namespace edm::streamer { if (!reg.frozen()) { reg.updateFromInput(descs); } - branchIDListHelper.updateFromInput(header.branchIDLists()); - thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper()); } } @@ -166,7 +158,7 @@ namespace edm::streamer { */ void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) { std::unique_ptr sd = deserializeRegistry(initView); - mergeIntoRegistry(*sd, productRegistryUpdate(), *branchIDListHelper(), *thinnedAssociationsHelper(), subsequent); + mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent); if (subsequent) { adjustEventToNewProductRegistry_ = true; } @@ -179,10 +171,28 @@ namespace edm::streamer { } } + void StreamerInputSource::updateEventMetaData() { + branchIDListHelper()->updateFromInput(sendEvent_->branchIDLists()); + thinnedAssociationsHelper()->updateFromPrimaryInput(sendEvent_->thinnedAssociationsHelper()); + } + + bool StreamerInputSource::isEventMetaData(EventMsgView const& eventView) const { return eventView.run() == 0; } + + uint32_t StreamerInputSource::eventMetaDataChecksum(EventMsgView const& eventView) const { + return eventView.adler32_chksum(); + } + + void StreamerInputSource::deserializeEventMetaData(EventMsgView const& eventView) { + deserializeEventCommon(eventView, true); + } /** * Deserializes the specified event message. */ void StreamerInputSource::deserializeEvent(EventMsgView const& eventView) { + deserializeEventCommon(eventView, false); + } + + void StreamerInputSource::deserializeEventCommon(EventMsgView const& eventView, bool isMetaData) { if (eventView.code() != Header::EVENT) throw cms::Exception("StreamTranslation", "Event deserialization error") << "received wrong message type: expected EVENT, got " << eventView.code() << "\n"; @@ -199,7 +209,7 @@ namespace edm::streamer { //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl; //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " " // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl; - if ((uint32)adler32_chksum != eventView.adler32_chksum()) { + if (static_cast(adler32_chksum) != eventView.adler32_chksum()) { // skip event (based on option?) or throw exception? throw cms::Exception("StreamDeserialization", "Checksum error") << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum() @@ -250,13 +260,24 @@ namespace edm::streamer { setRefCoreStreamer(); ; }); - sendEvent_ = std::unique_ptr((SendEvent*)xbuf_.ReadObjectAny(tc_)); + sendEvent_ = std::unique_ptr(reinterpret_cast(xbuf_.ReadObjectAny(tc_))); } if (sendEvent_.get() == nullptr) { throw cms::Exception("StreamTranslation", "Event deserialization error") << "got a null event from input stream\n"; } + + if (isMetaData) { + eventMetaDataChecksum_ = adler32_chksum; + return; + } + + if (sendEvent_->metaDataChecksum() != eventMetaDataChecksum_) { + throw cms::Exception("StreamTranslation") << " meta data checksum from event " << sendEvent_->metaDataChecksum() + << " does not match last read meta data " << eventMetaDataChecksum_; + } + processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory()); FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl; diff --git a/IOPool/Streamer/src/StreamerOutputModuleBase.cc b/IOPool/Streamer/src/StreamerOutputModuleBase.cc index ebfa10bdeed1e..6b17eaece784e 100644 --- a/IOPool/Streamer/src/StreamerOutputModuleBase.cc +++ b/IOPool/Streamer/src/StreamerOutputModuleBase.cc @@ -30,14 +30,14 @@ namespace edm::streamer { std::unique_ptr init_message = serializeRegistry(*getSerializerBuffer(), - *branchIDLists(), - *thinnedAssociationsHelper(), OutputModule::processName(), description().moduleLabel(), moduleDescription().mainParameterSetID(), psetMapHandle.isValid() ? psetMapHandle.product() : nullptr); doOutputHeader(*init_message); + lastCallWasBeginRun_ = true; + serializerBuffer_->clearHeaderBuffer(); } @@ -54,7 +54,13 @@ namespace edm::streamer { void StreamerOutputModuleBase::write(EventForOutput const& e) { Handle const& triggerResults = getTriggerResults(trToken_, e); - std::unique_ptr msg = serializeEvent(*getSerializerBuffer(), e, triggerResults, selectorConfig()); + if (lastCallWasBeginRun_) { + auto msg = serializeEventMetaData(*getSerializerBuffer(), *branchIDLists(), *thinnedAssociationsHelper()); + doOutputEvent(*msg); + lastCallWasBeginRun_ = false; + } + auto msg = serializeEvent(*getSerializerBuffer(), e, triggerResults, selectorConfig()); + doOutputEvent(*msg); // You can't use msg in StreamerOutputModuleBase after this point } diff --git a/IOPool/Streamer/src/StreamerOutputModuleCommon.cc b/IOPool/Streamer/src/StreamerOutputModuleCommon.cc index ab3e987af168c..da1912e9882e9 100644 --- a/IOPool/Streamer/src/StreamerOutputModuleCommon.cc +++ b/IOPool/Streamer/src/StreamerOutputModuleCommon.cc @@ -111,16 +111,14 @@ namespace edm::streamer { std::unique_ptr StreamerOutputModuleCommon::serializeRegistry( SerializeDataBuffer& sbuf, - const BranchIDLists& branchLists, - ThinnedAssociationsHelper const& helper, std::string const& processName, std::string const& moduleLabel, ParameterSetID const& toplevel, SendJobHeader::ParameterSetMap const* psetMap) { if (psetMap) { - serializer_.serializeRegistry(sbuf, branchLists, helper, *psetMap); + serializer_.serializeRegistry(sbuf, *psetMap); } else { - serializer_.serializeRegistry(sbuf, branchLists, helper); + serializer_.serializeRegistry(sbuf); } // resize header_buf_ to reflect space used in serializer_ + header // I just added an overhead for header of 50000 for now @@ -219,11 +217,6 @@ namespace edm::streamer { constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size; //Lets Build the Event Message first - //Following is strictly DUMMY Data for L! Trig and will be replaced with actual - // once figured out, there is no logic involved here. - std::vector l1bit = {true, true, false}; - //End of dummy data - std::vector hltbits; setHltMask(e, triggerResults, hltbits); @@ -237,23 +230,50 @@ namespace edm::streamer { // what about overflows? lumi = static_cast(timeInSec / std::abs(lumiSectionInterval_)) + 1; } + serializer_.serializeEvent( + sbuf, e, selectorCfg, eventMetaDataChecksum_, compressionAlgo_, compressionLevel_, reserve_size); - serializer_.serializeEvent(sbuf, e, selectorCfg, compressionAlgo_, compressionLevel_, reserve_size); + return serializeEventCommon(e.id().run(), lumi, e.id().event(), hltbits, hltsize_, sbuf); + } + std::unique_ptr StreamerOutputModuleCommon::serializeEventMetaData( + SerializeDataBuffer& sbuf, BranchIDLists const& branchLists, ThinnedAssociationsHelper const& helper) { + constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size; + //Lets Build the Event Message first + + std::vector hltbits; + serializer_.serializeEventMetaData(sbuf, branchLists, helper, compressionAlgo_, compressionLevel_, reserve_size); + eventMetaDataChecksum_ = sbuf.adler32_chksum_; + + return serializeEventCommon(0, 0, 0, hltbits, 0, sbuf); + } + + std::unique_ptr StreamerOutputModuleCommon::serializeEventCommon(uint32 run, + uint32 lumi, + uint64 event, + std::vector hltbits, + unsigned int hltsize, + SerializeDataBuffer& sbuf) { // resize header_buf_ to reserved size on first written event + constexpr unsigned int reserve_size = SerializeDataBuffer::reserve_size; if (sbuf.header_buf_.size() < reserve_size) sbuf.header_buf_.resize(reserve_size); + //Following is strictly DUMMY Data for L! Trig and will be replaced with actual + // once figured out, there is no logic involved here. + std::vector l1bit = {true, true, false}; + //End of dummy data + auto msg = std::make_unique(&sbuf.header_buf_[0], sbuf.comp_buf_.size(), - e.id().run(), - e.id().event(), + run, + event, lumi, outputModuleId_, 0, l1bit, (uint8*)&hltbits[0], - hltsize_, + hltsize, (uint32)sbuf.adler32_chksum(), host_name_); diff --git a/IOPool/Streamer/test/run_TestRefProductIDMetadataConsistencyStreamer.sh b/IOPool/Streamer/test/run_TestRefProductIDMetadataConsistencyStreamer.sh index 7ba360b52f072..972f0250cb077 100755 --- a/IOPool/Streamer/test/run_TestRefProductIDMetadataConsistencyStreamer.sh +++ b/IOPool/Streamer/test/run_TestRefProductIDMetadataConsistencyStreamer.sh @@ -25,4 +25,4 @@ CatStreamerFiles refconsistency_cat.dat refconsistency_1.dat refconsistency_10.d echo # ... fails -runFailure ${SCRAM_TEST_PATH}/testModuleTypeResolverRefTest_cfg.py --input moduletyperesolver_ref_cat.dat +runSuccess ${SCRAM_TEST_PATH}/testRefProductIDMetadataConsistencyStreamerTest_cfg.py --input refconsistency_cat.dat