From 24d45adfc1bd516b2fdfc7913c0c011166f3151c Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Sun, 12 Mar 2023 23:08:33 +0100 Subject: [PATCH 1/5] allow padding between event headers and ignore another INIT block which could be there due to concenating two files with a header file new streamer unit test for padded files write out files with padding. Used for unit test checking that such files can be opened and read address comments improved unit tests Note: fixed conflict with the unit test due to improved merged into master Conflicts: IOPool/Streamer/test/RunSimple_NewStreamer.sh --- IOPool/Streamer/interface/MsgHeader.h | 3 +- IOPool/Streamer/interface/StreamerFileIO.h | 7 ++- .../Streamer/interface/StreamerOutputFile.h | 2 +- IOPool/Streamer/src/StreamerFileIO.cc | 32 +++++++++- IOPool/Streamer/src/StreamerFileWriter.cc | 5 +- IOPool/Streamer/src/StreamerInputFile.cc | 62 ++++++++++++++++--- IOPool/Streamer/src/StreamerOutputFile.cc | 8 +-- IOPool/Streamer/test/NewStreamCopy_cfg.py | 3 +- IOPool/Streamer/test/NewStreamIn2_cfg.py | 3 +- .../Streamer/test/NewStreamInPadding_cfg.py | 36 +++++++++++ IOPool/Streamer/test/NewStreamIn_cfg.py | 3 +- .../Streamer/test/NewStreamOutPadding_cfg.py | 52 ++++++++++++++++ IOPool/Streamer/test/NewStreamOut_cfg.py | 3 +- IOPool/Streamer/test/RunSimple_NewStreamer.sh | 56 +++++------------ IOPool/Streamer/test/StreamThingAnalyzer.cc | 19 +++++- IOPool/Streamer/test/StreamThingAnalyzer.h | 5 +- 16 files changed, 232 insertions(+), 67 deletions(-) create mode 100644 IOPool/Streamer/test/NewStreamInPadding_cfg.py create mode 100644 IOPool/Streamer/test/NewStreamOutPadding_cfg.py diff --git a/IOPool/Streamer/interface/MsgHeader.h b/IOPool/Streamer/interface/MsgHeader.h index 060fc75ad3cf6..eb84f7747cb36 100644 --- a/IOPool/Streamer/interface/MsgHeader.h +++ b/IOPool/Streamer/interface/MsgHeader.h @@ -27,7 +27,8 @@ struct Header { ERROR_EVENT = 14, FILE_CLOSE_REQUEST = 15, SPARE1 = 16, - SPARE2 = 17 + SPARE2 = 17, + PADDING = 255 //reserved for padding }; }; diff --git a/IOPool/Streamer/interface/StreamerFileIO.h b/IOPool/Streamer/interface/StreamerFileIO.h index f2c7e94dfc46c..f42fead4632f6 100644 --- a/IOPool/Streamer/interface/StreamerFileIO.h +++ b/IOPool/Streamer/interface/StreamerFileIO.h @@ -20,13 +20,14 @@ namespace edm::streamer { */ { public: - explicit OutputFile(const std::string& name); + explicit OutputFile(const std::string& name, uint32 padding = 0); /** CTOR, takes file path name as argument */ ~OutputFile(); - bool write(const char* ptr, size_t n); + bool write(const char* ptr, size_t n, bool doPadding = false); + bool writePadding(); std::string fileName() const { return filename_; } uint64 current_offset() const { return current_offset_; } @@ -42,9 +43,11 @@ namespace edm::streamer { bool do_adler_; uint32 adlera_; uint32 adlerb_; + uint32 padding_; edm::propagate_const> ost_; std::string filename_; + std::unique_ptr paddingBuf_; }; } // namespace edm::streamer #endif diff --git a/IOPool/Streamer/interface/StreamerOutputFile.h b/IOPool/Streamer/interface/StreamerOutputFile.h index f94586333bc66..0bae650e7f83b 100644 --- a/IOPool/Streamer/interface/StreamerOutputFile.h +++ b/IOPool/Streamer/interface/StreamerOutputFile.h @@ -26,7 +26,7 @@ class StreamerOutputFile */ { public: - explicit StreamerOutputFile(const std::string& name); + explicit StreamerOutputFile(const std::string& name, uint32 padding = 0); /** CTOR, takes file path name as argument */ diff --git a/IOPool/Streamer/src/StreamerFileIO.cc b/IOPool/Streamer/src/StreamerFileIO.cc index f64ce331e9be0..41931f9500bf2 100644 --- a/IOPool/Streamer/src/StreamerFileIO.cc +++ b/IOPool/Streamer/src/StreamerFileIO.cc @@ -1,35 +1,61 @@ #include "IOPool/Streamer/interface/StreamerFileIO.h" #include #include +#include #include "FWCore/Utilities/interface/Adler32Calculator.h" #include "FWCore/Utilities/interface/Exception.h" +#include "IOPool/Streamer/interface/MsgHeader.h" namespace edm::streamer { - OutputFile::OutputFile(const std::string& name) + OutputFile::OutputFile(const std::string& name, uint32 padding) : current_offset_(1), do_adler_(false), adlera_(1), adlerb_(0), + padding_(padding), ost_(new std::ofstream(name.c_str(), std::ios_base::binary | std::ios_base::out)), filename_(name) { if (!ost_->is_open()) { throw cms::Exception("OutputFile", "OutputFile") << "Error Opening Output File: " << name << "\n"; } ost_->rdbuf()->pubsetbuf(nullptr, 0); + if (padding_) { + paddingBuf_ = std::make_unique(padding_); + memset(paddingBuf_.get(), Header::PADDING, padding_); + } } OutputFile::~OutputFile() { ost_->close(); } - bool OutputFile::write(const char* ptr, size_t n) { + bool OutputFile::write(const char* ptr, size_t n, bool doPadding) { ost_->write(ptr, n); if (!ost_->fail()) { current_offset_ += (uint64)(n); if (do_adler_) cms::Adler32(ptr, n, adlera_, adlerb_); + if (doPadding && padding_) { + return writePadding(); + } return false; } return true; } - void OutputFile::close() { ost_->close(); } + bool OutputFile::writePadding() { + uint64 mod = ost_->tellp() % padding_; + if (mod) { + uint32 rem = padding_ - (uint32)(mod % padding_); + bool ret = write(paddingBuf_.get(), rem, false); + return ret; + } + return false; + } + + void OutputFile::close() { + if (padding_) + if (writePadding()) + throw cms::Exception("OutputFile", "OutputFile") + << "Error writing padding to the output file: " << filename_ << ": " << std::strerror(errno); + ost_->close(); + } } // namespace edm::streamer diff --git a/IOPool/Streamer/src/StreamerFileWriter.cc b/IOPool/Streamer/src/StreamerFileWriter.cc index 9b330dad6256d..52234ff7d769b 100644 --- a/IOPool/Streamer/src/StreamerFileWriter.cc +++ b/IOPool/Streamer/src/StreamerFileWriter.cc @@ -3,7 +3,8 @@ namespace edm { StreamerFileWriter::StreamerFileWriter(edm::ParameterSet const& ps) - : stream_writer_(new StreamerOutputFile(ps.getUntrackedParameter("fileName"))) {} + : stream_writer_(new StreamerOutputFile(ps.getUntrackedParameter("fileName"), + ps.getUntrackedParameter("padding"))) {} StreamerFileWriter::StreamerFileWriter(std::string const& fileName) : stream_writer_(new StreamerOutputFile(fileName)) {} @@ -34,5 +35,7 @@ namespace edm { void StreamerFileWriter::fillDescription(ParameterSetDescription& desc) { desc.setComment("Writes events into a streamer output file."); desc.addUntracked("fileName", "teststreamfile.dat")->setComment("Name of output file."); + desc.addUntracked("padding", 0) + ->setComment("For testing: INIT and event block size will be rounded to this size padded with 0xff bytes."); } } //namespace edm diff --git a/IOPool/Streamer/src/StreamerInputFile.cc b/IOPool/Streamer/src/StreamerInputFile.cc index 25a9416f0a562..09da2ea8d1ee0 100644 --- a/IOPool/Streamer/src/StreamerInputFile.cc +++ b/IOPool/Streamer/src/StreamerInputFile.cc @@ -179,7 +179,7 @@ namespace edm { void StreamerInputFile::readStartMessage() { using namespace edm::storage; - IOSize nWant = sizeof(HeaderView); + IOSize nWant = sizeof(InitHeader); IOSize nGot = readBytes(&headerBuf_[0], nWant, false).first; if (nGot != nWant) { throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage") @@ -202,9 +202,9 @@ namespace edm { if (headerBuf_.size() < headerSize) headerBuf_.resize(headerSize); - if (headerSize > sizeof(HeaderView)) { - nWant = headerSize - sizeof(HeaderView); - auto res = readBytes(&headerBuf_[sizeof(HeaderView)], nWant, true, sizeof(HeaderView)); + if (headerSize > sizeof(InitHeader)) { + nWant = headerSize - sizeof(InitHeader); + auto res = readBytes(&headerBuf_[sizeof(InitHeader)], nWant, true, sizeof(InitHeader)); if (res.first != nWant) { throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage") << "Failed reading streamer file, second read in readStartMessage\n"; @@ -270,18 +270,47 @@ namespace edm { using namespace edm::storage; bool eventRead = false; + unsigned hdrSkipped = 0; while (!eventRead) { IOSize nWant = sizeof(EventHeader); - IOSize nGot = readBytes(&eventBuf_[0], nWant, false).first; + IOSize nGot = readBytes(&eventBuf_[hdrSkipped], nWant - hdrSkipped, false).first + hdrSkipped; + while (nGot == nWant) { + //allow padding before next event or end of file. + //event header starts with code 0 - 17, so 0xff (Header:PADDING) uniquely represents padding + bool headerFetched = false; + for (size_t i = 0; i < nGot; i++) { + if ((unsigned char)eventBuf_[i] != Header::PADDING) { + //no padding 0xff + if (i != 0) { + memmove(&eventBuf_[0], &eventBuf_[i], nGot - i); + //read remainder of the header + nGot = nGot - i + readBytes(&eventBuf_[nGot - i], i, false).first; + } + headerFetched = true; + break; + } + } + if (headerFetched) + break; + //read another block + nGot = readBytes(&eventBuf_[0], nWant, false).first; + } if (nGot == 0) { // no more data available endOfFile_ = true; return 0; } if (nGot != nWant) { - throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") - << "Failed reading streamer file, first read in readEventMessage\n" - << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n"; + for (size_t i = 0; i < nGot; i++) { + if ((unsigned char)eventBuf_[i] != Header::PADDING) + throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") + << "Failed reading streamer file, first read in readEventMessage\n" + << "Requested " << nWant << " bytes, read function returned " << nGot + << " bytes, non-padding at offset " << i; + } + //padded 0xff only + endOfFile_ = true; + return 0; } uint32 eventSize; { @@ -289,12 +318,27 @@ namespace edm { uint32 code = head.code(); // If it is not an event then something is wrong. + eventSize = head.size(); if (code != Header::EVENT) { + if (code == Header::INIT) { + edm::LogWarning("StreamerInputFile") << "Found another INIT header in the file. It will be skipped"; + if (eventSize < sizeof(EventHeader)) { + //very unlikely case that EventHeader is larger than total INIT size inserted in the middle of the file + hdrSkipped = nGot - eventSize; + memmove(&eventBuf_[0], &eventBuf_[eventSize], hdrSkipped); + continue; + } + if (headerBuf_.size() < eventSize) + headerBuf_.resize(eventSize); + memcpy(&headerBuf_[0], &eventBuf_[0], nGot); + readBytes(&headerBuf_[nGot], eventSize, true, nGot); + //do not parse this header and proceed to the next event + continue; + } throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") << "Failed reading streamer file, unknown code in event header\n" << "code = " << code << "\n"; } - eventSize = head.size(); } if (eventSize <= sizeof(EventHeader)) { throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") diff --git a/IOPool/Streamer/src/StreamerOutputFile.cc b/IOPool/Streamer/src/StreamerOutputFile.cc index e856b21a6c55e..0ee1470cf26ab 100644 --- a/IOPool/Streamer/src/StreamerOutputFile.cc +++ b/IOPool/Streamer/src/StreamerOutputFile.cc @@ -3,8 +3,8 @@ StreamerOutputFile::~StreamerOutputFile() {} -StreamerOutputFile::StreamerOutputFile(const std::string& name) - : streamerfile_(std::make_shared(name)) { +StreamerOutputFile::StreamerOutputFile(const std::string& name, uint32 padding) + : streamerfile_(std::make_shared(name, padding)) { streamerfile_->set_do_adler(true); } @@ -18,7 +18,7 @@ uint64 StreamerOutputFile::write(const EventMsgView& ineview) { uint64 offset_to_return = streamerfile_->current_offset(); writeEventHeader(ineview); - bool ret = streamerfile_->write((const char*)ineview.eventData(), ineview.size() - ineview.headerSize()); + bool ret = streamerfile_->write((const char*)ineview.eventData(), ineview.size() - ineview.headerSize(), true); if (ret) { throw cms::Exception("OutputFile", "write(EventMsgView)") << "Error writing streamer event data to " << streamerfile_->fileName() << ". Possibly the output disk " @@ -56,7 +56,7 @@ void StreamerOutputFile::write(const InitMsgBuilder& inview) { void StreamerOutputFile::write(const InitMsgView& inview) { writeStart(inview); - bool ret = streamerfile_->write((const char*)inview.descData(), inview.size() - inview.headerSize()); + bool ret = streamerfile_->write((const char*)inview.descData(), inview.size() - inview.headerSize(), true); if (ret) { throw cms::Exception("OutputFile", "write(InitMsgView)") << "Error writing streamer header data to " << streamerfile_->fileName() << ". Possibly the output disk " diff --git a/IOPool/Streamer/test/NewStreamCopy_cfg.py b/IOPool/Streamer/test/NewStreamCopy_cfg.py index 067edaa474aa8..b37347a97e36c 100644 --- a/IOPool/Streamer/test/NewStreamCopy_cfg.py +++ b/IOPool/Streamer/test/NewStreamCopy_cfg.py @@ -16,7 +16,8 @@ ) process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", - product_to_get = cms.string('m1') + product_to_get = cms.string('m1'), + inChecksum = cms.untracked.string('out') ) process.test = cms.EDAnalyzer('RunLumiEventAnalyzer', diff --git a/IOPool/Streamer/test/NewStreamIn2_cfg.py b/IOPool/Streamer/test/NewStreamIn2_cfg.py index a1b327966d619..85619bfb094cd 100644 --- a/IOPool/Streamer/test/NewStreamIn2_cfg.py +++ b/IOPool/Streamer/test/NewStreamIn2_cfg.py @@ -14,7 +14,8 @@ ) process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", - product_to_get = cms.string('m1') + product_to_get = cms.string('m1'), + inChecksum = cms.untracked.string('out') ) process.out = cms.OutputModule("PoolOutputModule", diff --git a/IOPool/Streamer/test/NewStreamInPadding_cfg.py b/IOPool/Streamer/test/NewStreamInPadding_cfg.py new file mode 100644 index 0000000000000..07dc0bdb5bb8e --- /dev/null +++ b/IOPool/Streamer/test/NewStreamInPadding_cfg.py @@ -0,0 +1,36 @@ +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +options = VarParsing.VarParsing('analysis') + +options.register ('inChecksum', + 'out', # default value + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Input checksum file") + +options.parseArguments() + + +process = cms.Process("TRANSFER") + +import FWCore.Framework.test.cmsExceptionsFatal_cff +process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options + +process.load("FWCore.MessageLogger.MessageLogger_cfi") + +process.source = cms.Source("NewEventStreamFileReader", + fileNames = cms.untracked.vstring('file:teststreamfile_padding.dat') + #firstEvent = cms.untracked.uint64(10123456835) +) + +process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", + product_to_get = cms.string('m1'), + inChecksum = cms.untracked.string(options.inChecksum) +) + +process.out = cms.OutputModule("PoolOutputModule", + fileName = cms.untracked.string('myout.root') +) + +process.end = cms.EndPath(process.a1*process.out) diff --git a/IOPool/Streamer/test/NewStreamIn_cfg.py b/IOPool/Streamer/test/NewStreamIn_cfg.py index a3303cc97283f..111ccc6970a94 100644 --- a/IOPool/Streamer/test/NewStreamIn_cfg.py +++ b/IOPool/Streamer/test/NewStreamIn_cfg.py @@ -13,7 +13,8 @@ ) process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", - product_to_get = cms.string('m1') + product_to_get = cms.string('m1'), + inChecksum = cms.untracked.string('out') ) process.out = cms.OutputModule("PoolOutputModule", diff --git a/IOPool/Streamer/test/NewStreamOutPadding_cfg.py b/IOPool/Streamer/test/NewStreamOutPadding_cfg.py new file mode 100644 index 0000000000000..92f1132d30aea --- /dev/null +++ b/IOPool/Streamer/test/NewStreamOutPadding_cfg.py @@ -0,0 +1,52 @@ +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +options = VarParsing.VarParsing('analysis') + +options.register ('compAlgo', + 'ZLIB', # default value + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Compression Algorithm") + +options.parseArguments() + + +process = cms.Process("HLT") + +import FWCore.Framework.test.cmsExceptionsFatal_cff +process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options + +process.load("FWCore.MessageLogger.MessageLogger_cfi") + +process.maxEvents = cms.untracked.PSet( + input = cms.untracked.int32(50) +) + +process.source = cms.Source("EmptySource", + firstEvent = cms.untracked.uint64(10123456789) +) + +process.m1 = cms.EDProducer("StreamThingProducer", + instance_count = cms.int32(5), + array_size = cms.int32(2) +) + +process.m2 = cms.EDProducer("NonProducer") + +process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", + product_to_get = cms.string('m1'), + outChecksum = cms.untracked.string('outPadded') +) + +process.out = cms.OutputModule("EventStreamFileWriter", + fileName = cms.untracked.string('teststreamfile_padding.dat'), + padding = cms.untracked.uint32(4096), + compression_level = cms.untracked.int32(1), + use_compression = cms.untracked.bool(True), + compression_algorithm = cms.untracked.string(options.compAlgo), + max_event_size = cms.untracked.int32(7000000) +) + +process.p1 = cms.Path(process.m1*process.a1*process.m2) +process.end = cms.EndPath(process.out) diff --git a/IOPool/Streamer/test/NewStreamOut_cfg.py b/IOPool/Streamer/test/NewStreamOut_cfg.py index 55075124eeda6..48f2214cae909 100644 --- a/IOPool/Streamer/test/NewStreamOut_cfg.py +++ b/IOPool/Streamer/test/NewStreamOut_cfg.py @@ -35,7 +35,8 @@ process.m2 = cms.EDProducer("NonProducer") process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", - product_to_get = cms.string('m1') + product_to_get = cms.string('m1'), + outChecksum = cms.untracked.string('out') ) process.out = cms.OutputModule("EventStreamFileWriter", diff --git a/IOPool/Streamer/test/RunSimple_NewStreamer.sh b/IOPool/Streamer/test/RunSimple_NewStreamer.sh index d890912ecd071..c17a96adbc638 100755 --- a/IOPool/Streamer/test/RunSimple_NewStreamer.sh +++ b/IOPool/Streamer/test/RunSimple_NewStreamer.sh @@ -1,6 +1,6 @@ #!/bin/bash -function die { echo Failure $1: status $2 ; exit $2 ; } +function die { echo Failure $1: status $2 ; echo ""; cat log ; exit $2 ; } if [ -z $LOCAL_TEST_DIR ]; then LOCAL_TEST_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" @@ -28,48 +28,26 @@ mkdir ${OUTDIR} cp *_cfg.py ${OUTDIR} cd ${OUTDIR} -cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > out 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? -cmsRun NewStreamOutAlt_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > outAlt 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? -cmsRun NewStreamOutExt_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > outExt 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? -cmsRun NewStreamOutExt2_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > outExt 2>&1 || die "cmsRun NewStreamOutExt2_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? -cmsRun --parameter-set NewStreamIn_cfg.py > in 2>&1 || die "cmsRun NewStreamIn_cfg.py" $? -cmsRun --parameter-set NewStreamIn2_cfg.py > in2 2>&1 || die "cmsRun NewStreamIn2_cfg.py" $? -cmsRun --parameter-set NewStreamCopy_cfg.py > copy 2>&1 || die "cmsRun NewStreamCopy_cfg.py" $? -cmsRun --parameter-set NewStreamCopy2_cfg.py > copy2 2>&1 || die "cmsRun NewStreamCopy2_cfg.py" $? -cmsRun --parameter-set NewStreamInAlt_cfg.py > alt 2>&1 || die "cmsRun NewStreamInAlt_cfg.py" $? -cmsRun --parameter-set NewStreamInExt_cfg.py > ext 2>&1 || die "cmsRun NewStreamInExt_cfg.py" $? -cmsRun --parameter-set NewStreamInExtBuf_cfg.py > ext 2>&1 || die "cmsRun NewStreamInExtBuf_cfg.py" $? +cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamOutAlt_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamOutExt_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamOutExt2_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOutExt2_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamOutPadding_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOutPadding_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamIn_cfg.py > log 2>&1 || die "cmsRun NewStreamIn_cfg.py" $? +cmsRun NewStreamIn2_cfg.py > log 2>&1 || die "cmsRun NewStreamIn2_cfg.py" $? +cmsRun NewStreamCopy_cfg.py > log 2>&1 || die "cmsRun NewStreamCopy_cfg.py" $? +cmsRun NewStreamCopy2_cfg.py > log 2>&1 || die "cmsRun NewStreamCopy2_cfg.py" $? +cmsRun NewStreamInAlt_cfg.py > log 2>&1 || die "cmsRun NewStreamInAlt_cfg.py" $? +cmsRun NewStreamInExt_cfg.py > log 2>&1 || die "cmsRun NewStreamInExt_cfg.py" $? +cmsRun NewStreamInExtBuf_cfg.py > log 2>&1 || die "cmsRun NewStreamInExtBuf_cfg.py" $? +cmsRun NewStreamInPadding_cfg.py > log 2>&1 || die "cmsRun NewStreamInPadding_cfg.py (1)" $? +cmsRun NewStreamInPadding_cfg.py inChecksum=outPadded > log 2>&1 || die "cmsRun NewStreamInPadding_cfg.py (2)" $? # echo "CHECKSUM = 1" > out -# echo "CHECKSUM = 1" > in -ANS_OUT_SIZE=`grep -c CHECKSUM out` -ANS_OUT=`grep CHECKSUM out` -ANS_IN=`grep CHECKSUM in` -ANS_IN2=`grep CHECKSUM in2` -ANS_COPY=`grep CHECKSUM copy` +if [ ! -s out ]; then -if [ "${ANS_OUT_SIZE}" == "0" ] -then - echo "New Stream Test Failed (out was not created)" - RC=1 -fi - -if [ "${ANS_OUT}" != "${ANS_IN}" ] -then - echo "New Stream Test Failed (out!=in)" - RC=1 -fi - -if [ "${ANS_OUT}" != "${ANS_IN2}" ] -then - echo "New Stream Test Failed (out!=in2)" - RC=1 -fi - -if [ "${ANS_OUT}" != "${ANS_COPY}" ] -then - echo "New Stream Test Failed (copy!=out)" + echo "New Stream Test Failed (out was not created or is empty)" RC=1 fi diff --git a/IOPool/Streamer/test/StreamThingAnalyzer.cc b/IOPool/Streamer/test/StreamThingAnalyzer.cc index d019109c3483a..ddecef40ce62b 100644 --- a/IOPool/Streamer/test/StreamThingAnalyzer.cc +++ b/IOPool/Streamer/test/StreamThingAnalyzer.cc @@ -22,6 +22,8 @@ namespace edmtest_thing { : name_(ps.getParameter("product_to_get")), total_(), out_("gennums.txt"), + inChecksumFile_(ps.getUntrackedParameter("inChecksum", "")), + outChecksumFile_(ps.getUntrackedParameter("outChecksum", "")), cnt_(), getterUsingLabel_(edm::ModuleLabelMatch(name_), this) { callWhenNewProductsRegistered(getterUsingLabel_); @@ -35,7 +37,22 @@ namespace edmtest_thing { //edm::LogInfo("stuff") << "again, ctor completing"; } - StreamThingAnalyzer::~StreamThingAnalyzer() { std::cout << "\nSTREAMTHING_CHECKSUM " << total_ << "\n" << std::endl; } + void StreamThingAnalyzer::endJob() { + edm::LogInfo("StreamThingAnalyzer") << "STREAMTHING_CHECKSUM " << total_; + if (!outChecksumFile_.empty()) { + edm::LogInfo("StreamThingAnalyzer") << "Writing checksum to " << outChecksumFile_; + std::ofstream outChecksum(outChecksumFile_); + outChecksum << total_; + } else if (!inChecksumFile_.empty()) { + edm::LogInfo("StreamThingAnalyzer") << "Reading checksum from " << inChecksumFile_; + int totalIn = 0; + std::ifstream inChecksum(inChecksumFile_); + inChecksum >> totalIn; + if (totalIn != total_) + throw cms::Exception("StreamThingAnalyzer") + << "Checksum mismatch input: " << totalIn << " compared to: " << total_; + } + } void StreamThingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) { typedef std::vector > ProdList; diff --git a/IOPool/Streamer/test/StreamThingAnalyzer.h b/IOPool/Streamer/test/StreamThingAnalyzer.h index 97a4d82024b9a..d2aa75602fbf9 100644 --- a/IOPool/Streamer/test/StreamThingAnalyzer.h +++ b/IOPool/Streamer/test/StreamThingAnalyzer.h @@ -21,8 +21,7 @@ namespace edmtest_thing { class StreamThingAnalyzer : public edm::one::EDAnalyzer<> { public: explicit StreamThingAnalyzer(edm::ParameterSet const&); - - ~StreamThingAnalyzer() override; + void endJob() override; void analyze(edm::Event const& e, edm::EventSetup const& c) override; @@ -30,6 +29,8 @@ namespace edmtest_thing { std::string name_; int total_; std::ofstream out_; + std::string inChecksumFile_; + std::string outChecksumFile_; int cnt_; edm::GetterOfProducts getterUsingLabel_; }; From 06c5a1a170cd91e835badeaee50f369cea583afd Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Thu, 23 Mar 2023 18:48:51 +0100 Subject: [PATCH 2/5] add back stream thing analyzer message --- IOPool/Streamer/test/StreamThingAnalyzer.cc | 2 ++ IOPool/Streamer/test/StreamThingAnalyzer.h | 3 +++ 2 files changed, 5 insertions(+) diff --git a/IOPool/Streamer/test/StreamThingAnalyzer.cc b/IOPool/Streamer/test/StreamThingAnalyzer.cc index ddecef40ce62b..93bb56384df8d 100644 --- a/IOPool/Streamer/test/StreamThingAnalyzer.cc +++ b/IOPool/Streamer/test/StreamThingAnalyzer.cc @@ -37,6 +37,8 @@ namespace edmtest_thing { //edm::LogInfo("stuff") << "again, ctor completing"; } + StreamThingAnalyzer::~StreamThingAnalyzer() { std::cout << "\nSTREAMTHING_CHECKSUM " << total_ << "\n" << std::endl; } + void StreamThingAnalyzer::endJob() { edm::LogInfo("StreamThingAnalyzer") << "STREAMTHING_CHECKSUM " << total_; if (!outChecksumFile_.empty()) { diff --git a/IOPool/Streamer/test/StreamThingAnalyzer.h b/IOPool/Streamer/test/StreamThingAnalyzer.h index d2aa75602fbf9..30af1f2c5e4b7 100644 --- a/IOPool/Streamer/test/StreamThingAnalyzer.h +++ b/IOPool/Streamer/test/StreamThingAnalyzer.h @@ -21,6 +21,9 @@ namespace edmtest_thing { class StreamThingAnalyzer : public edm::one::EDAnalyzer<> { public: explicit StreamThingAnalyzer(edm::ParameterSet const&); + + ~StreamThingAnalyzer() override; + void endJob() override; void analyze(edm::Event const& e, edm::EventSetup const& c) override; From eb3337fe5525db82330b70101799205f23521d7a Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Thu, 23 Mar 2023 18:49:26 +0100 Subject: [PATCH 3/5] add padded streamer file test to DQM Conflicts: DQMServices/StreamerIO/test/RunStreamer.sh --- DQMServices/StreamerIO/test/RunStreamer.sh | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/DQMServices/StreamerIO/test/RunStreamer.sh b/DQMServices/StreamerIO/test/RunStreamer.sh index 4e98455575eba..d1727e124c357 100755 --- a/DQMServices/StreamerIO/test/RunStreamer.sh +++ b/DQMServices/StreamerIO/test/RunStreamer.sh @@ -42,12 +42,21 @@ rm run000001/run000001_ls0000_EoR.jsn mv teststreamfile_ext.dat run000001/teststreamfile_ext.dat timeout --signal SIGTERM 180 cmsRun streamInExt_cfg.py > ext 2>&1 || die "cmsRun streamInExt_cfg.py" $? +echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn +cmsRun streamOutPadding_cfg.py > outPadding 2>&1 || die "cmsRun streamOutPadding_cfg.py" $? +mv teststreamfile_padding.dat run000001/teststreamfile.dat +timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inPadding 2>&1 || die "cmsRun streamIn_cfg.py" $? + + # echo "CHECKSUM = 1" > out # echo "CHECKSUM = 1" > in ANS_OUT_SIZE=`grep -c CHECKSUM out` +ANS_OUT_PADD_SIZE=`grep -c CHECKSUM outPadding` ANS_OUT=`grep CHECKSUM out` +ANS_OUT_PADD=`grep CHECKSUM outPadding` ANS_IN=`grep CHECKSUM in` +ANS_IN_PADD=`grep CHECKSUM inPadding` if [ "${ANS_OUT_SIZE}" == "0" ] then @@ -55,11 +64,22 @@ then RC=1 fi +if [ "${ANS_OUT_PADD_SIZE}" == "0" ] +then + echo "New Padded Stream Test Failed (out was not created)" + RC=1 +fi + if [ "${ANS_OUT}" != "${ANS_IN}" ] then echo "New Stream Test Failed (out!=in)" RC=1 fi -#rm -rf ${OUTDIR} +if [ "${ANS_OUT_PADD}" != "${ANS_IN_PADD}" ] +then + echo "New Padded Stream Test Failed (out!=in)" + RC=1 +fi + exit ${RC} From 2b54203f150b1ee162d96c3392c00613deb3e3ac Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Thu, 23 Mar 2023 20:25:00 +0100 Subject: [PATCH 4/5] Add padded streamer format check for DQM streamer source Conflicts: DQMServices/StreamerIO/test/RunStreamer.sh --- DQMServices/StreamerIO/test/RunStreamer.sh | 37 ++++++++------ .../StreamerIO/test/streamOutPadding_cfg.py | 51 +++++++++++++++++++ 2 files changed, 73 insertions(+), 15 deletions(-) create mode 100644 DQMServices/StreamerIO/test/streamOutPadding_cfg.py diff --git a/DQMServices/StreamerIO/test/RunStreamer.sh b/DQMServices/StreamerIO/test/RunStreamer.sh index d1727e124c357..32cf12a6f3a52 100755 --- a/DQMServices/StreamerIO/test/RunStreamer.sh +++ b/DQMServices/StreamerIO/test/RunStreamer.sh @@ -23,7 +23,20 @@ mkdir ${OUTDIR} cp *_cfg.py ${OUTDIR} cd ${OUTDIR} +rm -rf run000001 mkdir run000001 + +#test checksum with padded format +echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn +cmsRun streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $? +mv teststreamfile.dat run000001/teststreamfile.dat +cmsRun streamOutAlt_cfg.py > outAlt 2>&1 || die "cmsRun streamOutAlt_cfg.py" $? +cmsRun streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $? +timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $? + +rm -rf run000001 +mkdir run000001 + #the initial json file to read echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn cmsRun streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $? @@ -42,21 +55,15 @@ rm run000001/run000001_ls0000_EoR.jsn mv teststreamfile_ext.dat run000001/teststreamfile_ext.dat timeout --signal SIGTERM 180 cmsRun streamInExt_cfg.py > ext 2>&1 || die "cmsRun streamInExt_cfg.py" $? -echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn -cmsRun streamOutPadding_cfg.py > outPadding 2>&1 || die "cmsRun streamOutPadding_cfg.py" $? -mv teststreamfile_padding.dat run000001/teststreamfile.dat -timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inPadding 2>&1 || die "cmsRun streamIn_cfg.py" $? - - # echo "CHECKSUM = 1" > out # echo "CHECKSUM = 1" > in ANS_OUT_SIZE=`grep -c CHECKSUM out` -ANS_OUT_PADD_SIZE=`grep -c CHECKSUM outPadding` ANS_OUT=`grep CHECKSUM out` -ANS_OUT_PADD=`grep CHECKSUM outPadding` ANS_IN=`grep CHECKSUM in` -ANS_IN_PADD=`grep CHECKSUM inPadding` +ANS_OUTP_SIZE=`grep -c CHECKSUM outp` +ANS_OUTP=`grep CHECKSUM outp` +ANS_INP=`grep CHECKSUM inp` if [ "${ANS_OUT_SIZE}" == "0" ] then @@ -64,21 +71,21 @@ then RC=1 fi -if [ "${ANS_OUT_PADD_SIZE}" == "0" ] +if [ "${ANS_OUT}" != "${ANS_IN}" ] then - echo "New Padded Stream Test Failed (out was not created)" + echo "New Stream Test Failed (out!=in)" RC=1 fi -if [ "${ANS_OUT}" != "${ANS_IN}" ] +if [ "${ANS_OUTP_SIZE}" == "0" ] then - echo "New Stream Test Failed (out!=in)" + echo "New Stream Padded Test Failed (out was not created)" RC=1 fi -if [ "${ANS_OUT_PADD}" != "${ANS_IN_PADD}" ] +if [ "${ANS_OUTP}" != "${ANS_INP}" ] then - echo "New Padded Stream Test Failed (out!=in)" + echo "New Stream Padded Test Failed (out!=in)" RC=1 fi diff --git a/DQMServices/StreamerIO/test/streamOutPadding_cfg.py b/DQMServices/StreamerIO/test/streamOutPadding_cfg.py new file mode 100644 index 0000000000000..72b0bba5cfce2 --- /dev/null +++ b/DQMServices/StreamerIO/test/streamOutPadding_cfg.py @@ -0,0 +1,51 @@ +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +options = VarParsing.VarParsing('analysis') + +options.register ('compAlgo', + 'ZLIB', # default value + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Compression Algorithm") + +options.parseArguments() + + +process = cms.Process("HLT") + +import FWCore.Framework.test.cmsExceptionsFatal_cff +process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options + +process.load("FWCore.MessageLogger.MessageLogger_cfi") + +process.maxEvents = cms.untracked.PSet( + input = cms.untracked.int32(50) +) + +process.source = cms.Source("EmptySource", + firstEvent = cms.untracked.uint64(10123456789) +) + +process.m1 = cms.EDProducer("StreamThingProducer", + instance_count = cms.int32(5), + array_size = cms.int32(2) +) + +process.m2 = cms.EDProducer("NonProducer") + +process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", + product_to_get = cms.string('m1') +) + +process.out = cms.OutputModule("EventStreamFileWriter", + fileName = cms.untracked.string('teststreamfile.dat'), + padding = cms.untracked.uint32(4096), + compression_level = cms.untracked.int32(1), + use_compression = cms.untracked.bool(True), + compression_algorithm = cms.untracked.string(options.compAlgo), + max_event_size = cms.untracked.int32(7000000) +) + +process.p1 = cms.Path(process.m1*process.a1*process.m2) +process.end = cms.EndPath(process.out) From c350aec4cd330cd3e2debcbe758f88314ff6671d Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Thu, 23 Mar 2023 20:50:22 +0100 Subject: [PATCH 5/5] add padded streamer format unit trst for EcalCalibration Conflicts: CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh --- .../EcalLaserSorting/test/RunStreamer.sh | 24 ++++++++- .../test/streamOutPadding_cfg.py | 51 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py diff --git a/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh b/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh index 2ec9477703093..10e2bab9b09d6 100755 --- a/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh +++ b/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh @@ -23,6 +23,13 @@ mkdir ${OUTDIR} cp *_cfg.py ${OUTDIR} cd ${OUTDIR} +mkdir inDir +cmsRun streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $? +cp teststreamfile.dat teststreamfile.padding +mv teststreamfile.dat inDir/ +timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $? +rm -rf inDir + mkdir inDir cmsRun streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $? cp teststreamfile.dat teststreamfile.original @@ -50,6 +57,10 @@ ANS_OUT_SIZE=`grep -c CHECKSUM out` ANS_OUT=`grep CHECKSUM out` ANS_IN=`grep CHECKSUM in` +ANS_OUTP_SIZE=`grep -c CHECKSUM outp` +ANS_OUTP=`grep CHECKSUM outp` +ANS_INP=`grep CHECKSUM inp` + if [ "${ANS_OUT_SIZE}" == "0" ] then echo "New Stream Test Failed (out was not created)" @@ -62,5 +73,16 @@ then RC=1 fi -#rm -rf ${OUTDIR} +if [ "${ANS_OUTP_SIZE}" == "0" ] +then + echo "New Stream Test Failed (out was not created)" + RC=1 +fi + +if [ "${ANS_OUTP}" != "${ANS_INP}" ] +then + echo "New Stream Test Failed (out!=in)" + RC=1 +fi + exit ${RC} diff --git a/CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py b/CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py new file mode 100644 index 0000000000000..72b0bba5cfce2 --- /dev/null +++ b/CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py @@ -0,0 +1,51 @@ +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +options = VarParsing.VarParsing('analysis') + +options.register ('compAlgo', + 'ZLIB', # default value + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Compression Algorithm") + +options.parseArguments() + + +process = cms.Process("HLT") + +import FWCore.Framework.test.cmsExceptionsFatal_cff +process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options + +process.load("FWCore.MessageLogger.MessageLogger_cfi") + +process.maxEvents = cms.untracked.PSet( + input = cms.untracked.int32(50) +) + +process.source = cms.Source("EmptySource", + firstEvent = cms.untracked.uint64(10123456789) +) + +process.m1 = cms.EDProducer("StreamThingProducer", + instance_count = cms.int32(5), + array_size = cms.int32(2) +) + +process.m2 = cms.EDProducer("NonProducer") + +process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", + product_to_get = cms.string('m1') +) + +process.out = cms.OutputModule("EventStreamFileWriter", + fileName = cms.untracked.string('teststreamfile.dat'), + padding = cms.untracked.uint32(4096), + compression_level = cms.untracked.int32(1), + use_compression = cms.untracked.bool(True), + compression_algorithm = cms.untracked.string(options.compAlgo), + max_event_size = cms.untracked.int32(7000000) +) + +process.p1 = cms.Path(process.m1*process.a1*process.m2) +process.end = cms.EndPath(process.out)