Skip to content

Commit

Permalink
Merge pull request #41824 from smorovic/13_1_X-inputsource-checks
Browse files Browse the repository at this point in the history
 improved file size/buffer size checks in DAQ input source (13_1_X)
  • Loading branch information
cmsbuild authored Jun 1, 2023
2 parents 8ef3b40 + 76f53fa commit 61f1998
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 42 deletions.
4 changes: 4 additions & 0 deletions EventFilter/Utilities/interface/DAQSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class RawInputFile : public InputFile {
: InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
sourceParent_(parent) {}
bool advance(unsigned char*& dataPosition, const size_t size);
void advance(const size_t size) {
chunkPosition_ += size;
bufferPosition_ += size;
}

private:
DAQSource* sourceParent_;
Expand Down
1 change: 1 addition & 0 deletions EventFilter/Utilities/interface/FedRawDataInputSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class InputFile {
std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng);
}
uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; }
int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; }
};

#endif // EventFilter_Utilities_FedRawDataInputSource_h
Expand Down
91 changes: 52 additions & 39 deletions EventFilter/Utilities/src/DAQSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
if (verifyChecksum_ && !dataMode_->checksumValid()) {
if (fms_)
fms_->setExceptionDetected(currentLumiSection_);
throw cms::Exception("DAQSource::getNextEvent") << dataMode_->getChecksumError();
throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
}
setMonState(inCachedEvent);

Expand All @@ -385,7 +385,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
currentFile_.reset();
return status;
} else if (status == evf::EvFDaqDirector::runAbort) {
throw cms::Exception("DAQSource::getNextEvent") << "Run has been aborted by the input source reader thread";
throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
} else if (status == evf::EvFDaqDirector::newLumi) {
setMonState(inNewLumi);
if (currentFile_->lumi_ > currentLumiSection_) {
Expand Down Expand Up @@ -423,7 +423,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
//release last chunk (it is never released elsewhere)
freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
throw cms::Exception("DAQSource::getNextEvent")
throw cms::Exception("DAQSource::getNextDataBlock")
<< "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
<< " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
}
Expand All @@ -445,7 +445,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading file header";
throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";

edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
if (currentFile_->lumi_ > currentLumiSection_) {
Expand All @@ -456,14 +456,14 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
}

//advance buffer position to skip file header (chunk will be acquired later)
currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
currentFile_->advance(currentFile_->rawHeaderSize_);
}

//file is too short
if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < dataMode_->headerSize()) {
throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading event header";
}
//file is too short to fit event header
if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
throw cms::Exception("DAQSource::getNextDataBlock")
<< "Premature end of input file while reading event header. Missing: "
<< (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";

//multibuffer mode
//wait for the current chunk to become added to the vector
Expand All @@ -475,55 +475,62 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
}
setMonState(inChunkReceived);

//check if header is at the boundary of two chunks
chunkIsFree_ = false;
bool chunkEnd;
unsigned char* dataPosition;

//read event header, copy it to a single chunk if necessary
bool chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());

//get buffer size of current chunk (can be resized)
uint64_t currentChunkSize = currentFile_->currentChunkSize();

//prepare view based on header that was read
dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);

//check that payload size is within the file
const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();

if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading event data";
}
if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
throw cms::Exception("DAQSource::getNextEventDataBlock")
<< "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
<< ") while parsing block";

//for cross-buffer models
if (chunkEnd) {
//header was at the chunk boundary, we will have to move payload as well
//header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
//mark to release old chunk
chunkIsFree_ = true;
} else {
//header was contiguous, but check if payload fits the chunk
if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
//rewind to header start position
currentFile_->rewindChunk(dataMode_->headerSize());
//copy event to a chunk start and move pointers
} else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
//header was contiguous, but payload does not fit in the chunk
//rewind to header start position and then together with payload will be copied together to the old chunk
currentFile_->rewindChunk(dataMode_->headerSize());

setMonState(inWaitChunk);
setMonState(inWaitChunk);

//can already move buffer
chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);

setMonState(inChunkReceived);
//do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
assert(chunkEnd);
//mark to release old chunk
chunkIsFree_ = true;

assert(chunkEnd);
chunkIsFree_ = true;
//header is moved
dataMode_->makeDataBlockView(
dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
} else {
//everything is in a single chunk, only move pointers forward
chunkEnd = currentFile_->advance(dataPosition, msgSize);
assert(!chunkEnd);
chunkIsFree_ = false;
}
setMonState(inChunkReceived);
//header and payload is moved, update view
dataMode_->makeDataBlockView(
dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
} else {
//everything is in a single chunk, only move pointers forward
chunkEnd = currentFile_->advance(dataPosition, msgSize);
assert(!chunkEnd);
chunkIsFree_ = false;
}

//sanity-check check that the buffer position has not exceeded file size after preparing event
if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
throw cms::Exception("DAQSource::getNextEventDataBlock")
<< "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);

//prepare event
return getNextEventFromDataBlock();
}
Expand Down Expand Up @@ -618,7 +625,12 @@ void DAQSource::readSupervisor() {
//sleep until woken up by condition or a timeout
if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
counter++;
//if (!(counter%50)) edm::LogInfo("DAQSource") << "No free chunks or threads...";
if (!(counter % 6000)) {
edm::LogWarning("FedRawDataInputSource")
<< "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
<< ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
<< " / " << maxBufferedFiles_;
}
LogDebug("DAQSource") << "No free chunks or threads...";
} else {
assert(!workerPool_.empty() || freeChunks_.empty());
Expand Down Expand Up @@ -1329,6 +1341,7 @@ bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {

if (currentLeft < size) {
//we need next chunk
assert(chunks_.size() > currentChunk_ + 1);
while (!waitForChunk(currentChunk_ + 1)) {
sourceParent_->setMonState(inWaitChunk);
usleep(100000);
Expand Down
24 changes: 21 additions & 3 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
unsigned char* dataPosition;

//read header, copy it to a single chunk if necessary
if (currentFile_->fileSizeLeft() < FRDHeaderVersionSize[detectedFRDversion_])
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< "Premature end of input file (missing:"
<< (FRDHeaderVersionSize[detectedFRDversion_] - currentFile_->fileSizeLeft())
<< ") while reading event data for next event header";
bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);

event_ = std::make_unique<FRDEventMsgView>(dataPosition);
Expand All @@ -552,9 +557,10 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {

const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];

if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
if (currentFile_->fileSizeLeft() < msgSize) {
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< "Premature end of input file while reading event data";
<< "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
<< ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
}

if (chunkEnd) {
Expand Down Expand Up @@ -585,6 +591,12 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
chunkIsFree_ = false;
}
}
//sanity-check check that the buffer position has not exceeded file size after preparing event
if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
<< " after reading last event declared size of " << event_->size() << " bytes";
}
} //end multibuffer mode
setMonState(inChecksumEvent);

Expand Down Expand Up @@ -800,7 +812,12 @@ void FedRawDataInputSource::readSupervisor() {
//sleep until woken up by condition or a timeout
if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
counter++;
//if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
if (!(counter % 6000)) {
edm::LogWarning("FedRawDataInputSource")
<< "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
<< ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
<< " / " << maxBufferedFiles_;
}
LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
} else {
assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
Expand Down Expand Up @@ -1432,6 +1449,7 @@ inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size)

if (currentLeft < size) {
//we need next chunk
assert(chunks_.size() > currentChunk_ + 1);
while (!waitForChunk(currentChunk_ + 1)) {
parent_->setMonState(inWaitChunk);
usleep(100000);
Expand Down

0 comments on commit 61f1998

Please sign in to comment.