diff --git a/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh b/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh index 2ec9477703093..10e2bab9b09d6 100755 --- a/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh +++ b/CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh @@ -23,6 +23,13 @@ mkdir ${OUTDIR} cp *_cfg.py ${OUTDIR} cd ${OUTDIR} +mkdir inDir +cmsRun streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $? +cp teststreamfile.dat teststreamfile.padding +mv teststreamfile.dat inDir/ +timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $? +rm -rf inDir + mkdir inDir cmsRun streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $? cp teststreamfile.dat teststreamfile.original @@ -50,6 +57,10 @@ ANS_OUT_SIZE=`grep -c CHECKSUM out` ANS_OUT=`grep CHECKSUM out` ANS_IN=`grep CHECKSUM in` +ANS_OUTP_SIZE=`grep -c CHECKSUM outp` +ANS_OUTP=`grep CHECKSUM outp` +ANS_INP=`grep CHECKSUM inp` + if [ "${ANS_OUT_SIZE}" == "0" ] then echo "New Stream Test Failed (out was not created)" @@ -62,5 +73,16 @@ then RC=1 fi -#rm -rf ${OUTDIR} +if [ "${ANS_OUTP_SIZE}" == "0" ] +then + echo "New Stream Test Failed (out was not created)" + RC=1 +fi + +if [ "${ANS_OUTP}" != "${ANS_INP}" ] +then + echo "New Stream Test Failed (out!=in)" + RC=1 +fi + exit ${RC} diff --git a/CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py b/CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py new file mode 100644 index 0000000000000..72b0bba5cfce2 --- /dev/null +++ b/CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py @@ -0,0 +1,51 @@ +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +options = VarParsing.VarParsing('analysis') + +options.register ('compAlgo', + 'ZLIB', # default value + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Compression Algorithm") + +options.parseArguments() + + +process = cms.Process("HLT") + +import FWCore.Framework.test.cmsExceptionsFatal_cff +process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options + +process.load("FWCore.MessageLogger.MessageLogger_cfi") + +process.maxEvents = cms.untracked.PSet( + input = cms.untracked.int32(50) +) + +process.source = cms.Source("EmptySource", + firstEvent = cms.untracked.uint64(10123456789) +) + +process.m1 = cms.EDProducer("StreamThingProducer", + instance_count = cms.int32(5), + array_size = cms.int32(2) +) + +process.m2 = cms.EDProducer("NonProducer") + +process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", + product_to_get = cms.string('m1') +) + +process.out = cms.OutputModule("EventStreamFileWriter", + fileName = cms.untracked.string('teststreamfile.dat'), + padding = cms.untracked.uint32(4096), + compression_level = cms.untracked.int32(1), + use_compression = cms.untracked.bool(True), + compression_algorithm = cms.untracked.string(options.compAlgo), + max_event_size = cms.untracked.int32(7000000) +) + +process.p1 = cms.Path(process.m1*process.a1*process.m2) +process.end = cms.EndPath(process.out) diff --git a/DQMServices/StreamerIO/test/RunStreamer.sh b/DQMServices/StreamerIO/test/RunStreamer.sh index 4e98455575eba..32cf12a6f3a52 100755 --- a/DQMServices/StreamerIO/test/RunStreamer.sh +++ b/DQMServices/StreamerIO/test/RunStreamer.sh @@ -23,7 +23,20 @@ mkdir ${OUTDIR} cp *_cfg.py ${OUTDIR} cd ${OUTDIR} +rm -rf run000001 mkdir run000001 + +#test checksum with padded format +echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn +cmsRun streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $? +mv teststreamfile.dat run000001/teststreamfile.dat +cmsRun streamOutAlt_cfg.py > outAlt 2>&1 || die "cmsRun streamOutAlt_cfg.py" $? +cmsRun streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $? +timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $? + +rm -rf run000001 +mkdir run000001 + #the initial json file to read echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn cmsRun streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $? @@ -48,6 +61,9 @@ timeout --signal SIGTERM 180 cmsRun streamInExt_cfg.py > ext 2>&1 || die "cmsR ANS_OUT_SIZE=`grep -c CHECKSUM out` ANS_OUT=`grep CHECKSUM out` ANS_IN=`grep CHECKSUM in` +ANS_OUTP_SIZE=`grep -c CHECKSUM outp` +ANS_OUTP=`grep CHECKSUM outp` +ANS_INP=`grep CHECKSUM inp` if [ "${ANS_OUT_SIZE}" == "0" ] then @@ -61,5 +77,16 @@ then RC=1 fi -#rm -rf ${OUTDIR} +if [ "${ANS_OUTP_SIZE}" == "0" ] +then + echo "New Stream Padded Test Failed (out was not created)" + RC=1 +fi + +if [ "${ANS_OUTP}" != "${ANS_INP}" ] +then + echo "New Stream Padded Test Failed (out!=in)" + RC=1 +fi + exit ${RC} diff --git a/DQMServices/StreamerIO/test/streamOutPadding_cfg.py b/DQMServices/StreamerIO/test/streamOutPadding_cfg.py new file mode 100644 index 0000000000000..72b0bba5cfce2 --- /dev/null +++ b/DQMServices/StreamerIO/test/streamOutPadding_cfg.py @@ -0,0 +1,51 @@ +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +options = VarParsing.VarParsing('analysis') + +options.register ('compAlgo', + 'ZLIB', # default value + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Compression Algorithm") + +options.parseArguments() + + +process = cms.Process("HLT") + +import FWCore.Framework.test.cmsExceptionsFatal_cff +process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options + +process.load("FWCore.MessageLogger.MessageLogger_cfi") + +process.maxEvents = cms.untracked.PSet( + input = cms.untracked.int32(50) +) + +process.source = cms.Source("EmptySource", + firstEvent = cms.untracked.uint64(10123456789) +) + +process.m1 = cms.EDProducer("StreamThingProducer", + instance_count = cms.int32(5), + array_size = cms.int32(2) +) + +process.m2 = cms.EDProducer("NonProducer") + +process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", + product_to_get = cms.string('m1') +) + +process.out = cms.OutputModule("EventStreamFileWriter", + fileName = cms.untracked.string('teststreamfile.dat'), + padding = cms.untracked.uint32(4096), + compression_level = cms.untracked.int32(1), + use_compression = cms.untracked.bool(True), + compression_algorithm = cms.untracked.string(options.compAlgo), + max_event_size = cms.untracked.int32(7000000) +) + +process.p1 = cms.Path(process.m1*process.a1*process.m2) +process.end = cms.EndPath(process.out) diff --git a/IOPool/Streamer/interface/MsgHeader.h b/IOPool/Streamer/interface/MsgHeader.h index 060fc75ad3cf6..eb84f7747cb36 100644 --- a/IOPool/Streamer/interface/MsgHeader.h +++ b/IOPool/Streamer/interface/MsgHeader.h @@ -27,7 +27,8 @@ struct Header { ERROR_EVENT = 14, FILE_CLOSE_REQUEST = 15, SPARE1 = 16, - SPARE2 = 17 + SPARE2 = 17, + PADDING = 255 //reserved for padding }; }; diff --git a/IOPool/Streamer/interface/StreamerFileIO.h b/IOPool/Streamer/interface/StreamerFileIO.h index f2c7e94dfc46c..f42fead4632f6 100644 --- a/IOPool/Streamer/interface/StreamerFileIO.h +++ b/IOPool/Streamer/interface/StreamerFileIO.h @@ -20,13 +20,14 @@ namespace edm::streamer { */ { public: - explicit OutputFile(const std::string& name); + explicit OutputFile(const std::string& name, uint32 padding = 0); /** CTOR, takes file path name as argument */ ~OutputFile(); - bool write(const char* ptr, size_t n); + bool write(const char* ptr, size_t n, bool doPadding = false); + bool writePadding(); std::string fileName() const { return filename_; } uint64 current_offset() const { return current_offset_; } @@ -42,9 +43,11 @@ namespace edm::streamer { bool do_adler_; uint32 adlera_; uint32 adlerb_; + uint32 padding_; edm::propagate_const> ost_; std::string filename_; + std::unique_ptr paddingBuf_; }; } // namespace edm::streamer #endif diff --git a/IOPool/Streamer/interface/StreamerOutputFile.h b/IOPool/Streamer/interface/StreamerOutputFile.h index f94586333bc66..0bae650e7f83b 100644 --- a/IOPool/Streamer/interface/StreamerOutputFile.h +++ b/IOPool/Streamer/interface/StreamerOutputFile.h @@ -26,7 +26,7 @@ class StreamerOutputFile */ { public: - explicit StreamerOutputFile(const std::string& name); + explicit StreamerOutputFile(const std::string& name, uint32 padding = 0); /** CTOR, takes file path name as argument */ diff --git a/IOPool/Streamer/src/StreamerFileIO.cc b/IOPool/Streamer/src/StreamerFileIO.cc index f64ce331e9be0..41931f9500bf2 100644 --- a/IOPool/Streamer/src/StreamerFileIO.cc +++ b/IOPool/Streamer/src/StreamerFileIO.cc @@ -1,35 +1,61 @@ #include "IOPool/Streamer/interface/StreamerFileIO.h" #include #include +#include #include "FWCore/Utilities/interface/Adler32Calculator.h" #include "FWCore/Utilities/interface/Exception.h" +#include "IOPool/Streamer/interface/MsgHeader.h" namespace edm::streamer { - OutputFile::OutputFile(const std::string& name) + OutputFile::OutputFile(const std::string& name, uint32 padding) : current_offset_(1), do_adler_(false), adlera_(1), adlerb_(0), + padding_(padding), ost_(new std::ofstream(name.c_str(), std::ios_base::binary | std::ios_base::out)), filename_(name) { if (!ost_->is_open()) { throw cms::Exception("OutputFile", "OutputFile") << "Error Opening Output File: " << name << "\n"; } ost_->rdbuf()->pubsetbuf(nullptr, 0); + if (padding_) { + paddingBuf_ = std::make_unique(padding_); + memset(paddingBuf_.get(), Header::PADDING, padding_); + } } OutputFile::~OutputFile() { ost_->close(); } - bool OutputFile::write(const char* ptr, size_t n) { + bool OutputFile::write(const char* ptr, size_t n, bool doPadding) { ost_->write(ptr, n); if (!ost_->fail()) { current_offset_ += (uint64)(n); if (do_adler_) cms::Adler32(ptr, n, adlera_, adlerb_); + if (doPadding && padding_) { + return writePadding(); + } return false; } return true; } - void OutputFile::close() { ost_->close(); } + bool OutputFile::writePadding() { + uint64 mod = ost_->tellp() % padding_; + if (mod) { + uint32 rem = padding_ - (uint32)(mod % padding_); + bool ret = write(paddingBuf_.get(), rem, false); + return ret; + } + return false; + } + + void OutputFile::close() { + if (padding_) + if (writePadding()) + throw cms::Exception("OutputFile", "OutputFile") + << "Error writing padding to the output file: " << filename_ << ": " << std::strerror(errno); + ost_->close(); + } } // namespace edm::streamer diff --git a/IOPool/Streamer/src/StreamerFileWriter.cc b/IOPool/Streamer/src/StreamerFileWriter.cc index 9b330dad6256d..52234ff7d769b 100644 --- a/IOPool/Streamer/src/StreamerFileWriter.cc +++ b/IOPool/Streamer/src/StreamerFileWriter.cc @@ -3,7 +3,8 @@ namespace edm { StreamerFileWriter::StreamerFileWriter(edm::ParameterSet const& ps) - : stream_writer_(new StreamerOutputFile(ps.getUntrackedParameter("fileName"))) {} + : stream_writer_(new StreamerOutputFile(ps.getUntrackedParameter("fileName"), + ps.getUntrackedParameter("padding"))) {} StreamerFileWriter::StreamerFileWriter(std::string const& fileName) : stream_writer_(new StreamerOutputFile(fileName)) {} @@ -34,5 +35,7 @@ namespace edm { void StreamerFileWriter::fillDescription(ParameterSetDescription& desc) { desc.setComment("Writes events into a streamer output file."); desc.addUntracked("fileName", "teststreamfile.dat")->setComment("Name of output file."); + desc.addUntracked("padding", 0) + ->setComment("For testing: INIT and event block size will be rounded to this size padded with 0xff bytes."); } } //namespace edm diff --git a/IOPool/Streamer/src/StreamerInputFile.cc b/IOPool/Streamer/src/StreamerInputFile.cc index 25a9416f0a562..09da2ea8d1ee0 100644 --- a/IOPool/Streamer/src/StreamerInputFile.cc +++ b/IOPool/Streamer/src/StreamerInputFile.cc @@ -179,7 +179,7 @@ namespace edm { void StreamerInputFile::readStartMessage() { using namespace edm::storage; - IOSize nWant = sizeof(HeaderView); + IOSize nWant = sizeof(InitHeader); IOSize nGot = readBytes(&headerBuf_[0], nWant, false).first; if (nGot != nWant) { throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage") @@ -202,9 +202,9 @@ namespace edm { if (headerBuf_.size() < headerSize) headerBuf_.resize(headerSize); - if (headerSize > sizeof(HeaderView)) { - nWant = headerSize - sizeof(HeaderView); - auto res = readBytes(&headerBuf_[sizeof(HeaderView)], nWant, true, sizeof(HeaderView)); + if (headerSize > sizeof(InitHeader)) { + nWant = headerSize - sizeof(InitHeader); + auto res = readBytes(&headerBuf_[sizeof(InitHeader)], nWant, true, sizeof(InitHeader)); if (res.first != nWant) { throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage") << "Failed reading streamer file, second read in readStartMessage\n"; @@ -270,18 +270,47 @@ namespace edm { using namespace edm::storage; bool eventRead = false; + unsigned hdrSkipped = 0; while (!eventRead) { IOSize nWant = sizeof(EventHeader); - IOSize nGot = readBytes(&eventBuf_[0], nWant, false).first; + IOSize nGot = readBytes(&eventBuf_[hdrSkipped], nWant - hdrSkipped, false).first + hdrSkipped; + while (nGot == nWant) { + //allow padding before next event or end of file. + //event header starts with code 0 - 17, so 0xff (Header:PADDING) uniquely represents padding + bool headerFetched = false; + for (size_t i = 0; i < nGot; i++) { + if ((unsigned char)eventBuf_[i] != Header::PADDING) { + //no padding 0xff + if (i != 0) { + memmove(&eventBuf_[0], &eventBuf_[i], nGot - i); + //read remainder of the header + nGot = nGot - i + readBytes(&eventBuf_[nGot - i], i, false).first; + } + headerFetched = true; + break; + } + } + if (headerFetched) + break; + //read another block + nGot = readBytes(&eventBuf_[0], nWant, false).first; + } if (nGot == 0) { // no more data available endOfFile_ = true; return 0; } if (nGot != nWant) { - throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") - << "Failed reading streamer file, first read in readEventMessage\n" - << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n"; + for (size_t i = 0; i < nGot; i++) { + if ((unsigned char)eventBuf_[i] != Header::PADDING) + throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") + << "Failed reading streamer file, first read in readEventMessage\n" + << "Requested " << nWant << " bytes, read function returned " << nGot + << " bytes, non-padding at offset " << i; + } + //padded 0xff only + endOfFile_ = true; + return 0; } uint32 eventSize; { @@ -289,12 +318,27 @@ namespace edm { uint32 code = head.code(); // If it is not an event then something is wrong. + eventSize = head.size(); if (code != Header::EVENT) { + if (code == Header::INIT) { + edm::LogWarning("StreamerInputFile") << "Found another INIT header in the file. It will be skipped"; + if (eventSize < sizeof(EventHeader)) { + //very unlikely case that EventHeader is larger than total INIT size inserted in the middle of the file + hdrSkipped = nGot - eventSize; + memmove(&eventBuf_[0], &eventBuf_[eventSize], hdrSkipped); + continue; + } + if (headerBuf_.size() < eventSize) + headerBuf_.resize(eventSize); + memcpy(&headerBuf_[0], &eventBuf_[0], nGot); + readBytes(&headerBuf_[nGot], eventSize, true, nGot); + //do not parse this header and proceed to the next event + continue; + } throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") << "Failed reading streamer file, unknown code in event header\n" << "code = " << code << "\n"; } - eventSize = head.size(); } if (eventSize <= sizeof(EventHeader)) { throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage") diff --git a/IOPool/Streamer/src/StreamerOutputFile.cc b/IOPool/Streamer/src/StreamerOutputFile.cc index e856b21a6c55e..0ee1470cf26ab 100644 --- a/IOPool/Streamer/src/StreamerOutputFile.cc +++ b/IOPool/Streamer/src/StreamerOutputFile.cc @@ -3,8 +3,8 @@ StreamerOutputFile::~StreamerOutputFile() {} -StreamerOutputFile::StreamerOutputFile(const std::string& name) - : streamerfile_(std::make_shared(name)) { +StreamerOutputFile::StreamerOutputFile(const std::string& name, uint32 padding) + : streamerfile_(std::make_shared(name, padding)) { streamerfile_->set_do_adler(true); } @@ -18,7 +18,7 @@ uint64 StreamerOutputFile::write(const EventMsgView& ineview) { uint64 offset_to_return = streamerfile_->current_offset(); writeEventHeader(ineview); - bool ret = streamerfile_->write((const char*)ineview.eventData(), ineview.size() - ineview.headerSize()); + bool ret = streamerfile_->write((const char*)ineview.eventData(), ineview.size() - ineview.headerSize(), true); if (ret) { throw cms::Exception("OutputFile", "write(EventMsgView)") << "Error writing streamer event data to " << streamerfile_->fileName() << ". Possibly the output disk " @@ -56,7 +56,7 @@ void StreamerOutputFile::write(const InitMsgBuilder& inview) { void StreamerOutputFile::write(const InitMsgView& inview) { writeStart(inview); - bool ret = streamerfile_->write((const char*)inview.descData(), inview.size() - inview.headerSize()); + bool ret = streamerfile_->write((const char*)inview.descData(), inview.size() - inview.headerSize(), true); if (ret) { throw cms::Exception("OutputFile", "write(InitMsgView)") << "Error writing streamer header data to " << streamerfile_->fileName() << ". Possibly the output disk " diff --git a/IOPool/Streamer/test/NewStreamCopy_cfg.py b/IOPool/Streamer/test/NewStreamCopy_cfg.py index 067edaa474aa8..b37347a97e36c 100644 --- a/IOPool/Streamer/test/NewStreamCopy_cfg.py +++ b/IOPool/Streamer/test/NewStreamCopy_cfg.py @@ -16,7 +16,8 @@ ) process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", - product_to_get = cms.string('m1') + product_to_get = cms.string('m1'), + inChecksum = cms.untracked.string('out') ) process.test = cms.EDAnalyzer('RunLumiEventAnalyzer', diff --git a/IOPool/Streamer/test/NewStreamIn2_cfg.py b/IOPool/Streamer/test/NewStreamIn2_cfg.py index a1b327966d619..85619bfb094cd 100644 --- a/IOPool/Streamer/test/NewStreamIn2_cfg.py +++ b/IOPool/Streamer/test/NewStreamIn2_cfg.py @@ -14,7 +14,8 @@ ) process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", - product_to_get = cms.string('m1') + product_to_get = cms.string('m1'), + inChecksum = cms.untracked.string('out') ) process.out = cms.OutputModule("PoolOutputModule", diff --git a/IOPool/Streamer/test/NewStreamInPadding_cfg.py b/IOPool/Streamer/test/NewStreamInPadding_cfg.py new file mode 100644 index 0000000000000..07dc0bdb5bb8e --- /dev/null +++ b/IOPool/Streamer/test/NewStreamInPadding_cfg.py @@ -0,0 +1,36 @@ +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +options = VarParsing.VarParsing('analysis') + +options.register ('inChecksum', + 'out', # default value + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Input checksum file") + +options.parseArguments() + + +process = cms.Process("TRANSFER") + +import FWCore.Framework.test.cmsExceptionsFatal_cff +process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options + +process.load("FWCore.MessageLogger.MessageLogger_cfi") + +process.source = cms.Source("NewEventStreamFileReader", + fileNames = cms.untracked.vstring('file:teststreamfile_padding.dat') + #firstEvent = cms.untracked.uint64(10123456835) +) + +process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", + product_to_get = cms.string('m1'), + inChecksum = cms.untracked.string(options.inChecksum) +) + +process.out = cms.OutputModule("PoolOutputModule", + fileName = cms.untracked.string('myout.root') +) + +process.end = cms.EndPath(process.a1*process.out) diff --git a/IOPool/Streamer/test/NewStreamIn_cfg.py b/IOPool/Streamer/test/NewStreamIn_cfg.py index a3303cc97283f..111ccc6970a94 100644 --- a/IOPool/Streamer/test/NewStreamIn_cfg.py +++ b/IOPool/Streamer/test/NewStreamIn_cfg.py @@ -13,7 +13,8 @@ ) process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", - product_to_get = cms.string('m1') + product_to_get = cms.string('m1'), + inChecksum = cms.untracked.string('out') ) process.out = cms.OutputModule("PoolOutputModule", diff --git a/IOPool/Streamer/test/NewStreamOutPadding_cfg.py b/IOPool/Streamer/test/NewStreamOutPadding_cfg.py new file mode 100644 index 0000000000000..92f1132d30aea --- /dev/null +++ b/IOPool/Streamer/test/NewStreamOutPadding_cfg.py @@ -0,0 +1,52 @@ +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +options = VarParsing.VarParsing('analysis') + +options.register ('compAlgo', + 'ZLIB', # default value + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Compression Algorithm") + +options.parseArguments() + + +process = cms.Process("HLT") + +import FWCore.Framework.test.cmsExceptionsFatal_cff +process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options + +process.load("FWCore.MessageLogger.MessageLogger_cfi") + +process.maxEvents = cms.untracked.PSet( + input = cms.untracked.int32(50) +) + +process.source = cms.Source("EmptySource", + firstEvent = cms.untracked.uint64(10123456789) +) + +process.m1 = cms.EDProducer("StreamThingProducer", + instance_count = cms.int32(5), + array_size = cms.int32(2) +) + +process.m2 = cms.EDProducer("NonProducer") + +process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", + product_to_get = cms.string('m1'), + outChecksum = cms.untracked.string('outPadded') +) + +process.out = cms.OutputModule("EventStreamFileWriter", + fileName = cms.untracked.string('teststreamfile_padding.dat'), + padding = cms.untracked.uint32(4096), + compression_level = cms.untracked.int32(1), + use_compression = cms.untracked.bool(True), + compression_algorithm = cms.untracked.string(options.compAlgo), + max_event_size = cms.untracked.int32(7000000) +) + +process.p1 = cms.Path(process.m1*process.a1*process.m2) +process.end = cms.EndPath(process.out) diff --git a/IOPool/Streamer/test/NewStreamOut_cfg.py b/IOPool/Streamer/test/NewStreamOut_cfg.py index 55075124eeda6..48f2214cae909 100644 --- a/IOPool/Streamer/test/NewStreamOut_cfg.py +++ b/IOPool/Streamer/test/NewStreamOut_cfg.py @@ -35,7 +35,8 @@ process.m2 = cms.EDProducer("NonProducer") process.a1 = cms.EDAnalyzer("StreamThingAnalyzer", - product_to_get = cms.string('m1') + product_to_get = cms.string('m1'), + outChecksum = cms.untracked.string('out') ) process.out = cms.OutputModule("EventStreamFileWriter", diff --git a/IOPool/Streamer/test/RunSimple_NewStreamer.sh b/IOPool/Streamer/test/RunSimple_NewStreamer.sh index d890912ecd071..c17a96adbc638 100755 --- a/IOPool/Streamer/test/RunSimple_NewStreamer.sh +++ b/IOPool/Streamer/test/RunSimple_NewStreamer.sh @@ -1,6 +1,6 @@ #!/bin/bash -function die { echo Failure $1: status $2 ; exit $2 ; } +function die { echo Failure $1: status $2 ; echo ""; cat log ; exit $2 ; } if [ -z $LOCAL_TEST_DIR ]; then LOCAL_TEST_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" @@ -28,48 +28,26 @@ mkdir ${OUTDIR} cp *_cfg.py ${OUTDIR} cd ${OUTDIR} -cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > out 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? -cmsRun NewStreamOutAlt_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > outAlt 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? -cmsRun NewStreamOutExt_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > outExt 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? -cmsRun NewStreamOutExt2_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > outExt 2>&1 || die "cmsRun NewStreamOutExt2_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? -cmsRun --parameter-set NewStreamIn_cfg.py > in 2>&1 || die "cmsRun NewStreamIn_cfg.py" $? -cmsRun --parameter-set NewStreamIn2_cfg.py > in2 2>&1 || die "cmsRun NewStreamIn2_cfg.py" $? -cmsRun --parameter-set NewStreamCopy_cfg.py > copy 2>&1 || die "cmsRun NewStreamCopy_cfg.py" $? -cmsRun --parameter-set NewStreamCopy2_cfg.py > copy2 2>&1 || die "cmsRun NewStreamCopy2_cfg.py" $? -cmsRun --parameter-set NewStreamInAlt_cfg.py > alt 2>&1 || die "cmsRun NewStreamInAlt_cfg.py" $? -cmsRun --parameter-set NewStreamInExt_cfg.py > ext 2>&1 || die "cmsRun NewStreamInExt_cfg.py" $? -cmsRun --parameter-set NewStreamInExtBuf_cfg.py > ext 2>&1 || die "cmsRun NewStreamInExtBuf_cfg.py" $? +cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamOutAlt_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamOutExt_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOut_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamOutExt2_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOutExt2_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamOutPadding_cfg.py compAlgo=${TEST_COMPRESSION_ALGO} > log 2>&1 || die "cmsRun NewStreamOutPadding_cfg.py compAlgo=${TEST_COMPRESSION_ALGO}" $? +cmsRun NewStreamIn_cfg.py > log 2>&1 || die "cmsRun NewStreamIn_cfg.py" $? +cmsRun NewStreamIn2_cfg.py > log 2>&1 || die "cmsRun NewStreamIn2_cfg.py" $? +cmsRun NewStreamCopy_cfg.py > log 2>&1 || die "cmsRun NewStreamCopy_cfg.py" $? +cmsRun NewStreamCopy2_cfg.py > log 2>&1 || die "cmsRun NewStreamCopy2_cfg.py" $? +cmsRun NewStreamInAlt_cfg.py > log 2>&1 || die "cmsRun NewStreamInAlt_cfg.py" $? +cmsRun NewStreamInExt_cfg.py > log 2>&1 || die "cmsRun NewStreamInExt_cfg.py" $? +cmsRun NewStreamInExtBuf_cfg.py > log 2>&1 || die "cmsRun NewStreamInExtBuf_cfg.py" $? +cmsRun NewStreamInPadding_cfg.py > log 2>&1 || die "cmsRun NewStreamInPadding_cfg.py (1)" $? +cmsRun NewStreamInPadding_cfg.py inChecksum=outPadded > log 2>&1 || die "cmsRun NewStreamInPadding_cfg.py (2)" $? # echo "CHECKSUM = 1" > out -# echo "CHECKSUM = 1" > in -ANS_OUT_SIZE=`grep -c CHECKSUM out` -ANS_OUT=`grep CHECKSUM out` -ANS_IN=`grep CHECKSUM in` -ANS_IN2=`grep CHECKSUM in2` -ANS_COPY=`grep CHECKSUM copy` +if [ ! -s out ]; then -if [ "${ANS_OUT_SIZE}" == "0" ] -then - echo "New Stream Test Failed (out was not created)" - RC=1 -fi - -if [ "${ANS_OUT}" != "${ANS_IN}" ] -then - echo "New Stream Test Failed (out!=in)" - RC=1 -fi - -if [ "${ANS_OUT}" != "${ANS_IN2}" ] -then - echo "New Stream Test Failed (out!=in2)" - RC=1 -fi - -if [ "${ANS_OUT}" != "${ANS_COPY}" ] -then - echo "New Stream Test Failed (copy!=out)" + echo "New Stream Test Failed (out was not created or is empty)" RC=1 fi diff --git a/IOPool/Streamer/test/StreamThingAnalyzer.cc b/IOPool/Streamer/test/StreamThingAnalyzer.cc index d019109c3483a..93bb56384df8d 100644 --- a/IOPool/Streamer/test/StreamThingAnalyzer.cc +++ b/IOPool/Streamer/test/StreamThingAnalyzer.cc @@ -22,6 +22,8 @@ namespace edmtest_thing { : name_(ps.getParameter("product_to_get")), total_(), out_("gennums.txt"), + inChecksumFile_(ps.getUntrackedParameter("inChecksum", "")), + outChecksumFile_(ps.getUntrackedParameter("outChecksum", "")), cnt_(), getterUsingLabel_(edm::ModuleLabelMatch(name_), this) { callWhenNewProductsRegistered(getterUsingLabel_); @@ -37,6 +39,23 @@ namespace edmtest_thing { StreamThingAnalyzer::~StreamThingAnalyzer() { std::cout << "\nSTREAMTHING_CHECKSUM " << total_ << "\n" << std::endl; } + void StreamThingAnalyzer::endJob() { + edm::LogInfo("StreamThingAnalyzer") << "STREAMTHING_CHECKSUM " << total_; + if (!outChecksumFile_.empty()) { + edm::LogInfo("StreamThingAnalyzer") << "Writing checksum to " << outChecksumFile_; + std::ofstream outChecksum(outChecksumFile_); + outChecksum << total_; + } else if (!inChecksumFile_.empty()) { + edm::LogInfo("StreamThingAnalyzer") << "Reading checksum from " << inChecksumFile_; + int totalIn = 0; + std::ifstream inChecksum(inChecksumFile_); + inChecksum >> totalIn; + if (totalIn != total_) + throw cms::Exception("StreamThingAnalyzer") + << "Checksum mismatch input: " << totalIn << " compared to: " << total_; + } + } + void StreamThingAnalyzer::analyze(edm::Event const& e, edm::EventSetup const&) { typedef std::vector > ProdList; ProdList prod; diff --git a/IOPool/Streamer/test/StreamThingAnalyzer.h b/IOPool/Streamer/test/StreamThingAnalyzer.h index 97a4d82024b9a..30af1f2c5e4b7 100644 --- a/IOPool/Streamer/test/StreamThingAnalyzer.h +++ b/IOPool/Streamer/test/StreamThingAnalyzer.h @@ -24,12 +24,16 @@ namespace edmtest_thing { ~StreamThingAnalyzer() override; + void endJob() override; + void analyze(edm::Event const& e, edm::EventSetup const& c) override; private: std::string name_; int total_; std::ofstream out_; + std::string inChecksumFile_; + std::string outChecksumFile_; int cnt_; edm::GetterOfProducts getterUsingLabel_; };