Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(13_0_X) streamer format padding backport #41186

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion CalibCalorimetry/EcalLaserSorting/test/RunStreamer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ mkdir ${OUTDIR}
cp *_cfg.py ${OUTDIR}
cd ${OUTDIR}

mkdir inDir
cmsRun streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $?
cp teststreamfile.dat teststreamfile.padding
mv teststreamfile.dat inDir/
timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $?
rm -rf inDir

mkdir inDir
cmsRun streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $?
cp teststreamfile.dat teststreamfile.original
Expand Down Expand Up @@ -50,6 +57,10 @@ ANS_OUT_SIZE=`grep -c CHECKSUM out`
ANS_OUT=`grep CHECKSUM out`
ANS_IN=`grep CHECKSUM in`

ANS_OUTP_SIZE=`grep -c CHECKSUM outp`
ANS_OUTP=`grep CHECKSUM outp`
ANS_INP=`grep CHECKSUM inp`

if [ "${ANS_OUT_SIZE}" == "0" ]
then
echo "New Stream Test Failed (out was not created)"
Expand All @@ -62,5 +73,16 @@ then
RC=1
fi

#rm -rf ${OUTDIR}
if [ "${ANS_OUTP_SIZE}" == "0" ]
then
echo "New Stream Test Failed (out was not created)"
RC=1
fi

if [ "${ANS_OUTP}" != "${ANS_INP}" ]
then
echo "New Stream Test Failed (out!=in)"
RC=1
fi

exit ${RC}
51 changes: 51 additions & 0 deletions CalibCalorimetry/EcalLaserSorting/test/streamOutPadding_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import FWCore.ParameterSet.Config as cms
import FWCore.ParameterSet.VarParsing as VarParsing

options = VarParsing.VarParsing('analysis')

options.register ('compAlgo',
'ZLIB', # default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"Compression Algorithm")

options.parseArguments()


process = cms.Process("HLT")

import FWCore.Framework.test.cmsExceptionsFatal_cff
process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options

process.load("FWCore.MessageLogger.MessageLogger_cfi")

process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(50)
)

process.source = cms.Source("EmptySource",
firstEvent = cms.untracked.uint64(10123456789)
)

process.m1 = cms.EDProducer("StreamThingProducer",
instance_count = cms.int32(5),
array_size = cms.int32(2)
)

process.m2 = cms.EDProducer("NonProducer")

process.a1 = cms.EDAnalyzer("StreamThingAnalyzer",
product_to_get = cms.string('m1')
)

process.out = cms.OutputModule("EventStreamFileWriter",
fileName = cms.untracked.string('teststreamfile.dat'),
padding = cms.untracked.uint32(4096),
compression_level = cms.untracked.int32(1),
use_compression = cms.untracked.bool(True),
compression_algorithm = cms.untracked.string(options.compAlgo),
max_event_size = cms.untracked.int32(7000000)
)

process.p1 = cms.Path(process.m1*process.a1*process.m2)
process.end = cms.EndPath(process.out)
29 changes: 28 additions & 1 deletion DQMServices/StreamerIO/test/RunStreamer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,20 @@ mkdir ${OUTDIR}
cp *_cfg.py ${OUTDIR}
cd ${OUTDIR}

rm -rf run000001
mkdir run000001

#test checksum with padded format
echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn
cmsRun streamOutPadding_cfg.py > outp 2>&1 || die "cmsRun streamOutPadding_cfg.py" $?
mv teststreamfile.dat run000001/teststreamfile.dat
cmsRun streamOutAlt_cfg.py > outAlt 2>&1 || die "cmsRun streamOutAlt_cfg.py" $?
cmsRun streamOutExt_cfg.py > outExt 2>&1 || die "cmsRun streamOutExt_cfg.py" $?
timeout --signal SIGTERM 180 cmsRun streamIn_cfg.py > inp 2>&1 || die "cmsRun streamIn_cfg.py" $?

rm -rf run000001
mkdir run000001

#the initial json file to read
echo "{\"data\" :[10,10, \"teststreamfile.dat\"]}" > run000001/run1_ls1_test.jsn
cmsRun streamOut_cfg.py > out 2>&1 || die "cmsRun streamOut_cfg.py" $?
Expand All @@ -48,6 +61,9 @@ timeout --signal SIGTERM 180 cmsRun streamInExt_cfg.py > ext 2>&1 || die "cmsR
ANS_OUT_SIZE=`grep -c CHECKSUM out`
ANS_OUT=`grep CHECKSUM out`
ANS_IN=`grep CHECKSUM in`
ANS_OUTP_SIZE=`grep -c CHECKSUM outp`
ANS_OUTP=`grep CHECKSUM outp`
ANS_INP=`grep CHECKSUM inp`

if [ "${ANS_OUT_SIZE}" == "0" ]
then
Expand All @@ -61,5 +77,16 @@ then
RC=1
fi

#rm -rf ${OUTDIR}
if [ "${ANS_OUTP_SIZE}" == "0" ]
then
echo "New Stream Padded Test Failed (out was not created)"
RC=1
fi

if [ "${ANS_OUTP}" != "${ANS_INP}" ]
then
echo "New Stream Padded Test Failed (out!=in)"
RC=1
fi

exit ${RC}
51 changes: 51 additions & 0 deletions DQMServices/StreamerIO/test/streamOutPadding_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import FWCore.ParameterSet.Config as cms
import FWCore.ParameterSet.VarParsing as VarParsing

options = VarParsing.VarParsing('analysis')

options.register ('compAlgo',
'ZLIB', # default value
VarParsing.VarParsing.multiplicity.singleton,
VarParsing.VarParsing.varType.string,
"Compression Algorithm")

options.parseArguments()


process = cms.Process("HLT")

import FWCore.Framework.test.cmsExceptionsFatal_cff
process.options = FWCore.Framework.test.cmsExceptionsFatal_cff.options

process.load("FWCore.MessageLogger.MessageLogger_cfi")

process.maxEvents = cms.untracked.PSet(
input = cms.untracked.int32(50)
)

process.source = cms.Source("EmptySource",
firstEvent = cms.untracked.uint64(10123456789)
)

process.m1 = cms.EDProducer("StreamThingProducer",
instance_count = cms.int32(5),
array_size = cms.int32(2)
)

process.m2 = cms.EDProducer("NonProducer")

process.a1 = cms.EDAnalyzer("StreamThingAnalyzer",
product_to_get = cms.string('m1')
)

process.out = cms.OutputModule("EventStreamFileWriter",
fileName = cms.untracked.string('teststreamfile.dat'),
padding = cms.untracked.uint32(4096),
compression_level = cms.untracked.int32(1),
use_compression = cms.untracked.bool(True),
compression_algorithm = cms.untracked.string(options.compAlgo),
max_event_size = cms.untracked.int32(7000000)
)

process.p1 = cms.Path(process.m1*process.a1*process.m2)
process.end = cms.EndPath(process.out)
3 changes: 2 additions & 1 deletion IOPool/Streamer/interface/MsgHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ struct Header {
ERROR_EVENT = 14,
FILE_CLOSE_REQUEST = 15,
SPARE1 = 16,
SPARE2 = 17
SPARE2 = 17,
PADDING = 255 //reserved for padding
};
};

Expand Down
7 changes: 5 additions & 2 deletions IOPool/Streamer/interface/StreamerFileIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ namespace edm::streamer {
*/
{
public:
explicit OutputFile(const std::string& name);
explicit OutputFile(const std::string& name, uint32 padding = 0);
/**
CTOR, takes file path name as argument
*/
~OutputFile();

bool write(const char* ptr, size_t n);
bool write(const char* ptr, size_t n, bool doPadding = false);
bool writePadding();

std::string fileName() const { return filename_; }
uint64 current_offset() const { return current_offset_; }
Expand All @@ -42,9 +43,11 @@ namespace edm::streamer {
bool do_adler_;
uint32 adlera_;
uint32 adlerb_;
uint32 padding_;

edm::propagate_const<std::shared_ptr<std::ofstream>> ost_;
std::string filename_;
std::unique_ptr<char[]> paddingBuf_;
};
} // namespace edm::streamer
#endif
2 changes: 1 addition & 1 deletion IOPool/Streamer/interface/StreamerOutputFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class StreamerOutputFile
*/
{
public:
explicit StreamerOutputFile(const std::string& name);
explicit StreamerOutputFile(const std::string& name, uint32 padding = 0);
/**
CTOR, takes file path name as argument
*/
Expand Down
32 changes: 29 additions & 3 deletions IOPool/Streamer/src/StreamerFileIO.cc
Original file line number Diff line number Diff line change
@@ -1,35 +1,61 @@
#include "IOPool/Streamer/interface/StreamerFileIO.h"
#include <fstream>
#include <iostream>
#include <cstring>
#include "FWCore/Utilities/interface/Adler32Calculator.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "IOPool/Streamer/interface/MsgHeader.h"

namespace edm::streamer {
OutputFile::OutputFile(const std::string& name)
OutputFile::OutputFile(const std::string& name, uint32 padding)
: current_offset_(1),
do_adler_(false),
adlera_(1),
adlerb_(0),
padding_(padding),
ost_(new std::ofstream(name.c_str(), std::ios_base::binary | std::ios_base::out)),
filename_(name) {
if (!ost_->is_open()) {
throw cms::Exception("OutputFile", "OutputFile") << "Error Opening Output File: " << name << "\n";
}
ost_->rdbuf()->pubsetbuf(nullptr, 0);
if (padding_) {
paddingBuf_ = std::make_unique<char[]>(padding_);
memset(paddingBuf_.get(), Header::PADDING, padding_);
}
}

OutputFile::~OutputFile() { ost_->close(); }

bool OutputFile::write(const char* ptr, size_t n) {
bool OutputFile::write(const char* ptr, size_t n, bool doPadding) {
ost_->write(ptr, n);
if (!ost_->fail()) {
current_offset_ += (uint64)(n);
if (do_adler_)
cms::Adler32(ptr, n, adlera_, adlerb_);
if (doPadding && padding_) {
return writePadding();
}
return false;
}
return true;
}

void OutputFile::close() { ost_->close(); }
bool OutputFile::writePadding() {
uint64 mod = ost_->tellp() % padding_;
if (mod) {
uint32 rem = padding_ - (uint32)(mod % padding_);
bool ret = write(paddingBuf_.get(), rem, false);
return ret;
}
return false;
}

void OutputFile::close() {
if (padding_)
if (writePadding())
throw cms::Exception("OutputFile", "OutputFile")
<< "Error writing padding to the output file: " << filename_ << ": " << std::strerror(errno);
ost_->close();
}
} // namespace edm::streamer
5 changes: 4 additions & 1 deletion IOPool/Streamer/src/StreamerFileWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

namespace edm {
StreamerFileWriter::StreamerFileWriter(edm::ParameterSet const& ps)
: stream_writer_(new StreamerOutputFile(ps.getUntrackedParameter<std::string>("fileName"))) {}
: stream_writer_(new StreamerOutputFile(ps.getUntrackedParameter<std::string>("fileName"),
ps.getUntrackedParameter<unsigned int>("padding"))) {}

StreamerFileWriter::StreamerFileWriter(std::string const& fileName)
: stream_writer_(new StreamerOutputFile(fileName)) {}
Expand Down Expand Up @@ -34,5 +35,7 @@ namespace edm {
void StreamerFileWriter::fillDescription(ParameterSetDescription& desc) {
desc.setComment("Writes events into a streamer output file.");
desc.addUntracked<std::string>("fileName", "teststreamfile.dat")->setComment("Name of output file.");
desc.addUntracked<unsigned int>("padding", 0)
->setComment("For testing: INIT and event block size will be rounded to this size padded with 0xff bytes.");
}
} //namespace edm
Loading