From 9638a10f30cd76c56422cb089cb8712af60e00d3 Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Tue, 30 May 2023 22:08:31 +0200 Subject: [PATCH 1/2] improved file size/buffer size checks in DAQSource --- EventFilter/Utilities/interface/DAQSource.h | 4 + EventFilter/Utilities/src/DAQSource.cc | 91 ++++++++++++--------- 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/EventFilter/Utilities/interface/DAQSource.h b/EventFilter/Utilities/interface/DAQSource.h index 4d808c9de88b0..2b35a85837205 100644 --- a/EventFilter/Utilities/interface/DAQSource.h +++ b/EventFilter/Utilities/interface/DAQSource.h @@ -192,6 +192,10 @@ class RawInputFile : public InputFile { : InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr), sourceParent_(parent) {} bool advance(unsigned char*& dataPosition, const size_t size); + void advance(const size_t size) { + chunkPosition_ += size; + bufferPosition_ += size; + } private: DAQSource* sourceParent_; diff --git a/EventFilter/Utilities/src/DAQSource.cc b/EventFilter/Utilities/src/DAQSource.cc index 36dc59356db75..1f1e1972d33ae 100644 --- a/EventFilter/Utilities/src/DAQSource.cc +++ b/EventFilter/Utilities/src/DAQSource.cc @@ -358,7 +358,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() { if (verifyChecksum_ && !dataMode_->checksumValid()) { if (fms_) fms_->setExceptionDetected(currentLumiSection_); - throw cms::Exception("DAQSource::getNextEvent") << dataMode_->getChecksumError(); + throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError(); } setMonState(inCachedEvent); @@ -385,7 +385,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() { currentFile_.reset(); return status; } else if (status == evf::EvFDaqDirector::runAbort) { - throw cms::Exception("DAQSource::getNextEvent") << "Run has been aborted by the input source reader thread"; + throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread"; } else if (status == evf::EvFDaqDirector::newLumi) { setMonState(inNewLumi); if (currentFile_->lumi_ > currentLumiSection_) { @@ -423,7 +423,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() { //release last chunk (it is never released elsewhere) freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]); if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) { - throw cms::Exception("DAQSource::getNextEvent") + throw cms::Exception("DAQSource::getNextDataBlock") << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_ << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events"; } @@ -445,7 +445,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() { if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) { if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) { if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_) - throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading file header"; + throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header"; edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_; if (currentFile_->lumi_ > currentLumiSection_) { @@ -456,14 +456,14 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() { } //advance buffer position to skip file header (chunk will be acquired later) - currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_; - currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_; + currentFile_->advance(currentFile_->rawHeaderSize_); } - //file is too short - if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < dataMode_->headerSize()) { - throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading event header"; - } + //file is too short to fit event header + if (currentFile_->fileSizeLeft() < dataMode_->headerSize()) + throw cms::Exception("DAQSource::getNextDataBlock") + << "Premature end of input file while reading event header. Missing: " + << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes"; //multibuffer mode //wait for the current chunk to become added to the vector @@ -475,55 +475,62 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() { } setMonState(inChunkReceived); - //check if header is at the boundary of two chunks chunkIsFree_ = false; + bool chunkEnd; unsigned char* dataPosition; //read event header, copy it to a single chunk if necessary - bool chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize()); + chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize()); //get buffer size of current chunk (can be resized) uint64_t currentChunkSize = currentFile_->currentChunkSize(); + //prepare view based on header that was read dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_); + //check that payload size is within the file const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize(); - if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) { - throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading event data"; - } + if (currentFile_->fileSizeLeft() < (int64_t)msgSize) + throw cms::Exception("DAQSource::getNextEventDataBlock") + << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft()) + << ") while parsing block"; //for cross-buffer models if (chunkEnd) { - //header was at the chunk boundary, we will have to move payload as well + //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize()); + //mark to release old chunk chunkIsFree_ = true; - } else { - //header was contiguous, but check if payload fits the chunk - if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) { - //rewind to header start position - currentFile_->rewindChunk(dataMode_->headerSize()); - //copy event to a chunk start and move pointers + } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) { + //header was contiguous, but payload does not fit in the chunk + //rewind to header start position and then together with payload will be copied together to the old chunk + currentFile_->rewindChunk(dataMode_->headerSize()); - setMonState(inWaitChunk); + setMonState(inWaitChunk); - //can already move buffer - chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize); - - setMonState(inChunkReceived); + //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk + chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize); + assert(chunkEnd); + //mark to release old chunk + chunkIsFree_ = true; - assert(chunkEnd); - chunkIsFree_ = true; - //header is moved - dataMode_->makeDataBlockView( - dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_); - } else { - //everything is in a single chunk, only move pointers forward - chunkEnd = currentFile_->advance(dataPosition, msgSize); - assert(!chunkEnd); - chunkIsFree_ = false; - } + setMonState(inChunkReceived); + //header and payload is moved, update view + dataMode_->makeDataBlockView( + dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_); + } else { + //everything is in a single chunk, only move pointers forward + chunkEnd = currentFile_->advance(dataPosition, msgSize); + assert(!chunkEnd); + chunkIsFree_ = false; } + + //sanity-check check that the buffer position has not exceeded file size after preparing event + if (currentFile_->fileSize_ < currentFile_->bufferPosition_) + throw cms::Exception("DAQSource::getNextEventDataBlock") + << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_); + //prepare event return getNextEventFromDataBlock(); } @@ -618,7 +625,12 @@ void DAQSource::readSupervisor() { //sleep until woken up by condition or a timeout if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) { counter++; - //if (!(counter%50)) edm::LogInfo("DAQSource") << "No free chunks or threads..."; + if (!(counter % 6000)) { + edm::LogWarning("FedRawDataInputSource") + << "No free chunks or threads. Worker pool empty:" << workerPool_.empty() + << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_ + << " / " << maxBufferedFiles_; + } LogDebug("DAQSource") << "No free chunks or threads..."; } else { assert(!workerPool_.empty() || freeChunks_.empty()); @@ -1329,6 +1341,7 @@ bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) { if (currentLeft < size) { //we need next chunk + assert(chunks_.size() > currentChunk_ + 1); while (!waitForChunk(currentChunk_ + 1)) { sourceParent_->setMonState(inWaitChunk); usleep(100000); From 76f53fa8352da5c236ef8b4c374a0317c9fe2705 Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Tue, 30 May 2023 22:22:40 +0200 Subject: [PATCH 2/2] FEDRawDataInputSource changes --- .../interface/FedRawDataInputSource.h | 1 + .../Utilities/src/FedRawDataInputSource.cc | 24 ++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/EventFilter/Utilities/interface/FedRawDataInputSource.h b/EventFilter/Utilities/interface/FedRawDataInputSource.h index 62333475f1287..dea0500813209 100644 --- a/EventFilter/Utilities/interface/FedRawDataInputSource.h +++ b/EventFilter/Utilities/interface/FedRawDataInputSource.h @@ -312,6 +312,7 @@ class InputFile { std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng); } uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; } + int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; } }; #endif // EventFilter_Utilities_FedRawDataInputSource_h diff --git a/EventFilter/Utilities/src/FedRawDataInputSource.cc b/EventFilter/Utilities/src/FedRawDataInputSource.cc index abe9072eae84c..4f60680f43c8c 100644 --- a/EventFilter/Utilities/src/FedRawDataInputSource.cc +++ b/EventFilter/Utilities/src/FedRawDataInputSource.cc @@ -540,6 +540,11 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() { unsigned char* dataPosition; //read header, copy it to a single chunk if necessary + if (currentFile_->fileSizeLeft() < FRDHeaderVersionSize[detectedFRDversion_]) + throw cms::Exception("FedRawDataInputSource::getNextEvent") + << "Premature end of input file (missing:" + << (FRDHeaderVersionSize[detectedFRDversion_] - currentFile_->fileSizeLeft()) + << ") while reading event data for next event header"; bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]); event_ = std::make_unique(dataPosition); @@ -552,9 +557,10 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() { const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_]; - if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) { + if (currentFile_->fileSizeLeft() < msgSize) { throw cms::Exception("FedRawDataInputSource::getNextEvent") - << "Premature end of input file while reading event data"; + << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft()) + << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi(); } if (chunkEnd) { @@ -585,6 +591,12 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() { chunkIsFree_ = false; } } + //sanity-check check that the buffer position has not exceeded file size after preparing event + if (currentFile_->fileSize_ < currentFile_->bufferPosition_) { + throw cms::Exception("FedRawDataInputSource::getNextEvent") + << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_ + << " after reading last event declared size of " << event_->size() << " bytes"; + } } //end multibuffer mode setMonState(inChecksumEvent); @@ -800,7 +812,12 @@ void FedRawDataInputSource::readSupervisor() { //sleep until woken up by condition or a timeout if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) { counter++; - //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads..."; + if (!(counter % 6000)) { + edm::LogWarning("FedRawDataInputSource") + << "No free chunks or threads. Worker pool empty:" << workerPool_.empty() + << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_ + << " / " << maxBufferedFiles_; + } LogDebug("FedRawDataInputSource") << "No free chunks or threads..."; } else { assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty()); @@ -1432,6 +1449,7 @@ inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) if (currentLeft < size) { //we need next chunk + assert(chunks_.size() > currentChunk_ + 1); while (!waitForChunk(currentChunk_ + 1)) { parent_->setMonState(inWaitChunk); usleep(100000);