Skip to content

Commit

Permalink
Merge pull request #41186 from smorovic/13_0_X-streamer-padding-backp…
Browse files Browse the repository at this point in the history
…ort-41155

(13_0_X) streamer format padding backport
  • Loading branch information
cmsbuild authored Mar 28, 2023
2 parents 4908ace + c350aec commit 22623e1
Show file tree
Hide file tree
Showing 20 changed files with 387 additions and 66 deletions.
24 changes: 23 additions & 1 deletion CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ mkdir ${OUTDIR}
cp *_cfg.py ${OUTDIR}
cd ${OUTDIR}

mkdir inDir
cmsRun streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $?
cp teststreamfile.dat teststreamfile.padding
mv teststreamfile.dat inDir/
timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $?
rm -rf inDir

mkdir inDir
cmsRun streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $?
cp teststreamfile.dat teststreamfile.original
Expand Down Expand Up @@ -50,6 +57,10 @@ ANS_OUT_SIZE=`grep -c CHECKSUM out`
ANS_OUT=`grep CHECKSUM out`
ANS_IN=`grep CHECKSUM in`

ANS_OUTP_SIZE=`grep -c CHECKSUM outp`
ANS_OUTP=`grep CHECKSUM outp`
ANS_INP=`grep CHECKSUM inp`

if [ "${ANS_OUT_SIZE}" == "0" ]
then
echo "New Stream Test Failed (out was not created)"
Expand All @@ -62,5 +73,16 @@ then
RC=1
fi

#rm -rf ${OUTDIR}
if [ "${ANS_OUTP_SIZE}" == "0" ]
then
echo "New Stream Test Failed (out was not created)"
RC=1
fi

if [ "${ANS_OUTP}" != "${ANS_INP}" ]
then
echo "New Stream Test Failed (out!=in)"
RC=1
fi

exit ${RC}
51 changes: 51 additions & 0 deletions CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import FWCore.ParameterSet.Config as cms
import FWCore.ParameterSet.VarParsing as VarParsing

options = VarParsing.VarParsing('analysis')

options.register ('compAlgo',
'ZLIB', # default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"Compression Algorithm")

options.parseArguments()


process = cms.Process("HLT")

import FWCore.Framework.test.cmsExceptionsFatal_cff
process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options

process.load("FWCore.MessageLogger.MessageLogger_cfi")

process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(50)
)

process.source = cms.Source("EmptySource",
firstEvent = cms.untracked.uint64(10123456789)
)

process.m1 = cms.EDProducer("StreamThingProducer",
instance_count = cms.int32(5),
array_size = cms.int32(2)
)

process.m2 = cms.EDProducer("NonProducer")

process.a1 = cms.EDAnalyzer("StreamThingAnalyzer",
product_to_get = cms.string('m1')
)

process.out = cms.OutputModule("EventStreamFileWriter",
fileName = cms.untracked.string('teststreamfile.dat'),
padding = cms.untracked.uint32(4096),
compression_level = cms.untracked.int32(1),
use_compression = cms.untracked.bool(True),
compression_algorithm = cms.untracked.string(options.compAlgo),
max_event_size = cms.untracked.int32(7000000)
)

process.p1 = cms.Path(process.m1*process.a1*process.m2)
process.end = cms.EndPath(process.out)
29 changes: 28 additions & 1 deletion DQMServices/StreamerIO/test/RunStreamer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,20 @@ mkdir ${OUTDIR}
cp *_cfg.py ${OUTDIR}
cd ${OUTDIR}

rm -rf run000001
mkdir run000001

#test checksum with padded format
echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn
cmsRun streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $?
mv teststreamfile.dat run000001/teststreamfile.dat
cmsRun streamOutAlt_cfg.py > outAlt 2>&1 || die "cmsRun streamOutAlt_cfg.py" $?
cmsRun streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $?

rm -rf run000001
mkdir run000001

#the initial json file to read
echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn
cmsRun streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $?
Expand All @@ -48,6 +61,9 @@ timeout --signal SIGTERM 180 cmsRun streamInExt_cfg.py > ext 2>&1 || die "cmsR
ANS_OUT_SIZE=`grep -c CHECKSUM out`
ANS_OUT=`grep CHECKSUM out`
ANS_IN=`grep CHECKSUM in`
ANS_OUTP_SIZE=`grep -c CHECKSUM outp`
ANS_OUTP=`grep CHECKSUM outp`
ANS_INP=`grep CHECKSUM inp`

if [ "${ANS_OUT_SIZE}" == "0" ]
then
Expand All @@ -61,5 +77,16 @@ then
RC=1
fi

#rm -rf ${OUTDIR}
if [ "${ANS_OUTP_SIZE}" == "0" ]
then
echo "New Stream Padded Test Failed (out was not created)"
RC=1
fi

if [ "${ANS_OUTP}" != "${ANS_INP}" ]
then
echo "New Stream Padded Test Failed (out!=in)"
RC=1
fi

exit ${RC}
51 changes: 51 additions & 0 deletions DQMServices/StreamerIO/test/streamOutPadding_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import FWCore.ParameterSet.Config as cms
import FWCore.ParameterSet.VarParsing as VarParsing

options = VarParsing.VarParsing('analysis')

options.register ('compAlgo',
'ZLIB', # default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"Compression Algorithm")

options.parseArguments()


process = cms.Process("HLT")

import FWCore.Framework.test.cmsExceptionsFatal_cff
process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options

process.load("FWCore.MessageLogger.MessageLogger_cfi")

process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(50)
)

process.source = cms.Source("EmptySource",
firstEvent = cms.untracked.uint64(10123456789)
)

process.m1 = cms.EDProducer("StreamThingProducer",
instance_count = cms.int32(5),
array_size = cms.int32(2)
)

process.m2 = cms.EDProducer("NonProducer")

process.a1 = cms.EDAnalyzer("StreamThingAnalyzer",
product_to_get = cms.string('m1')
)

process.out = cms.OutputModule("EventStreamFileWriter",
fileName = cms.untracked.string('teststreamfile.dat'),
padding = cms.untracked.uint32(4096),
compression_level = cms.untracked.int32(1),
use_compression = cms.untracked.bool(True),
compression_algorithm = cms.untracked.string(options.compAlgo),
max_event_size = cms.untracked.int32(7000000)
)

process.p1 = cms.Path(process.m1*process.a1*process.m2)
process.end = cms.EndPath(process.out)
3 changes: 2 additions & 1 deletion IOPool/Streamer/interface/MsgHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ struct Header {
ERROR_EVENT = 14,
FILE_CLOSE_REQUEST = 15,
SPARE1 = 16,
SPARE2 = 17
SPARE2 = 17,
PADDING = 255 //reserved for padding
};
};

Expand Down
7 changes: 5 additions & 2 deletions IOPool/Streamer/interface/StreamerFileIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ namespace edm::streamer {
*/
{
public:
explicit OutputFile(const std::string& name);
explicit OutputFile(const std::string& name, uint32 padding = 0);
/**
CTOR, takes file path name as argument
*/
~OutputFile();

bool write(const char* ptr, size_t n);
bool write(const char* ptr, size_t n, bool doPadding = false);
bool writePadding();

std::string fileName() const { return filename_; }
uint64 current_offset() const { return current_offset_; }
Expand All @@ -42,9 +43,11 @@ namespace edm::streamer {
bool do_adler_;
uint32 adlera_;
uint32 adlerb_;
uint32 padding_;

edm::propagate_const<std::shared_ptr<std::ofstream>> ost_;
std::string filename_;
std::unique_ptr<char[]> paddingBuf_;
};
} // namespace edm::streamer
#endif
2 changes: 1 addition & 1 deletion IOPool/Streamer/interface/StreamerOutputFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class StreamerOutputFile
*/
{
public:
explicit StreamerOutputFile(const std::string& name);
explicit StreamerOutputFile(const std::string& name, uint32 padding = 0);
/**
CTOR, takes file path name as argument
*/
Expand Down
32 changes: 29 additions & 3 deletions IOPool/Streamer/src/StreamerFileIO.cc
Original file line number Diff line number Diff line change
@@ -1,35 +1,61 @@
#include "IOPool/Streamer/interface/StreamerFileIO.h"
#include <fstream>
#include <iostream>
#include <cstring>
#include "FWCore/Utilities/interface/Adler32Calculator.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "IOPool/Streamer/interface/MsgHeader.h"

namespace edm::streamer {
OutputFile::OutputFile(const std::string& name)
OutputFile::OutputFile(const std::string& name, uint32 padding)
: current_offset_(1),
do_adler_(false),
adlera_(1),
adlerb_(0),
padding_(padding),
ost_(new std::ofstream(name.c_str(), std::ios_base::binary | std::ios_base::out)),
filename_(name) {
if (!ost_->is_open()) {
throw cms::Exception("OutputFile", "OutputFile") << "Error Opening Output File: " << name << "\n";
}
ost_->rdbuf()->pubsetbuf(nullptr, 0);
if (padding_) {
paddingBuf_ = std::make_unique<char[]>(padding_);
memset(paddingBuf_.get(), Header::PADDING, padding_);
}
}

OutputFile::~OutputFile() { ost_->close(); }

bool OutputFile::write(const char* ptr, size_t n) {
bool OutputFile::write(const char* ptr, size_t n, bool doPadding) {
ost_->write(ptr, n);
if (!ost_->fail()) {
current_offset_ += (uint64)(n);
if (do_adler_)
cms::Adler32(ptr, n, adlera_, adlerb_);
if (doPadding && padding_) {
return writePadding();
}
return false;
}
return true;
}

void OutputFile::close() { ost_->close(); }
bool OutputFile::writePadding() {
uint64 mod = ost_->tellp() % padding_;
if (mod) {
uint32 rem = padding_ - (uint32)(mod % padding_);
bool ret = write(paddingBuf_.get(), rem, false);
return ret;
}
return false;
}

void OutputFile::close() {
if (padding_)
if (writePadding())
throw cms::Exception("OutputFile", "OutputFile")
<< "Error writing padding to the output file: " << filename_ << ": " << std::strerror(errno);
ost_->close();
}
} // namespace edm::streamer
5 changes: 4 additions & 1 deletion IOPool/Streamer/src/StreamerFileWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

namespace edm {
StreamerFileWriter::StreamerFileWriter(edm::ParameterSet const& ps)
: stream_writer_(new StreamerOutputFile(ps.getUntrackedParameter<std::string>("fileName"))) {}
: stream_writer_(new StreamerOutputFile(ps.getUntrackedParameter<std::string>("fileName"),
ps.getUntrackedParameter<unsigned int>("padding"))) {}

StreamerFileWriter::StreamerFileWriter(std::string const& fileName)
: stream_writer_(new StreamerOutputFile(fileName)) {}
Expand Down Expand Up @@ -34,5 +35,7 @@ namespace edm {
void StreamerFileWriter::fillDescription(ParameterSetDescription& desc) {
desc.setComment("Writes events into a streamer output file.");
desc.addUntracked<std::string>("fileName", "teststreamfile.dat")->setComment("Name of output file.");
desc.addUntracked<unsigned int>("padding", 0)
->setComment("For testing: INIT and event block size will be rounded to this size padded with 0xff bytes.");
}
} //namespace edm
Loading

0 comments on commit 22623e1

Please sign in to comment.