Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved file size/buffer size checks in DAQ input source (13_1_X) #41824

Merged
merged 2 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions EventFilter/Utilities/interface/DAQSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class RawInputFile : public InputFile {
: InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
sourceParent_(parent) {}
bool advance(unsigned char*& dataPosition, const size_t size);
void advance(const size_t size) {
chunkPosition_ += size;
bufferPosition_ += size;
}

private:
DAQSource* sourceParent_;
Expand Down
1 change: 1 addition & 0 deletions EventFilter/Utilities/interface/FedRawDataInputSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class InputFile {
std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng);
}
uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; }
int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; }
};

#endif // EventFilter_Utilities_FedRawDataInputSource_h
Expand Down
91 changes: 52 additions & 39 deletions EventFilter/Utilities/src/DAQSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock() {
if (verifyChecksum_ && !dataMode_->checksumValid()) {
if (fms_)
fms_->setExceptionDetected(currentLumiSection_);
throw cms::Exception("DAQSource::getNextEvent") << dataMode_->getChecksumError();
throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
}
setMonState(inCachedEvent);

Expand All @@ -385,7 +385,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
currentFile_.reset();
return status;
} else if (status == evf::EvFDaqDirector::runAbort) {
throw cms::Exception("DAQSource::getNextEvent") << "Run has been aborted by the input source reader thread";
throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
} else if (status == evf::EvFDaqDirector::newLumi) {
setMonState(inNewLumi);
if (currentFile_->lumi_ > currentLumiSection_) {
Expand Down Expand Up @@ -423,7 +423,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
//release last chunk (it is never released elsewhere)
freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
throw cms::Exception("DAQSource::getNextEvent")
throw cms::Exception("DAQSource::getNextDataBlock")
<< "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
<< " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
}
Expand All @@ -445,7 +445,7 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading file header";
throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";

edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
if (currentFile_->lumi_ > currentLumiSection_) {
Expand All @@ -456,14 +456,14 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
}

//advance buffer position to skip file header (chunk will be acquired later)
currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
currentFile_->advance(currentFile_->rawHeaderSize_);
}

//file is too short
if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < dataMode_->headerSize()) {
throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading event header";
}
//file is too short to fit event header
if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
throw cms::Exception("DAQSource::getNextDataBlock")
<< "Premature end of input file while reading event header. Missing: "
<< (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";

//multibuffer mode
//wait for the current chunk to become added to the vector
Expand All @@ -475,55 +475,62 @@ evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock() {
}
setMonState(inChunkReceived);

//check if header is at the boundary of two chunks
chunkIsFree_ = false;
bool chunkEnd;
unsigned char* dataPosition;

//read event header, copy it to a single chunk if necessary
bool chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());

//get buffer size of current chunk (can be resized)
uint64_t currentChunkSize = currentFile_->currentChunkSize();

//prepare view based on header that was read
dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);

//check that payload size is within the file
const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();

if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading event data";
}
if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
throw cms::Exception("DAQSource::getNextEventDataBlock")
<< "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
<< ") while parsing block";

//for cross-buffer models
if (chunkEnd) {
//header was at the chunk boundary, we will have to move payload as well
//header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
//mark to release old chunk
chunkIsFree_ = true;
} else {
//header was contiguous, but check if payload fits the chunk
if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
//rewind to header start position
currentFile_->rewindChunk(dataMode_->headerSize());
//copy event to a chunk start and move pointers
} else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
//header was contiguous, but payload does not fit in the chunk
//rewind to header start position and then together with payload will be copied together to the old chunk
currentFile_->rewindChunk(dataMode_->headerSize());

setMonState(inWaitChunk);
setMonState(inWaitChunk);

//can already move buffer
chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);

setMonState(inChunkReceived);
//do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
assert(chunkEnd);
//mark to release old chunk
chunkIsFree_ = true;

assert(chunkEnd);
chunkIsFree_ = true;
//header is moved
dataMode_->makeDataBlockView(
dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
} else {
//everything is in a single chunk, only move pointers forward
chunkEnd = currentFile_->advance(dataPosition, msgSize);
assert(!chunkEnd);
chunkIsFree_ = false;
}
setMonState(inChunkReceived);
//header and payload is moved, update view
dataMode_->makeDataBlockView(
dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
} else {
//everything is in a single chunk, only move pointers forward
chunkEnd = currentFile_->advance(dataPosition, msgSize);
assert(!chunkEnd);
chunkIsFree_ = false;
}

//sanity-check check that the buffer position has not exceeded file size after preparing event
if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
throw cms::Exception("DAQSource::getNextEventDataBlock")
<< "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);

//prepare event
return getNextEventFromDataBlock();
}
Expand Down Expand Up @@ -618,7 +625,12 @@ void DAQSource::readSupervisor() {
//sleep until woken up by condition or a timeout
if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
counter++;
//if (!(counter%50)) edm::LogInfo("DAQSource") << "No free chunks or threads...";
if (!(counter % 6000)) {
edm::LogWarning("FedRawDataInputSource")
<< "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
<< ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
<< " / " << maxBufferedFiles_;
}
LogDebug("DAQSource") << "No free chunks or threads...";
} else {
assert(!workerPool_.empty() || freeChunks_.empty());
Expand Down Expand Up @@ -1329,6 +1341,7 @@ bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {

if (currentLeft < size) {
//we need next chunk
assert(chunks_.size() > currentChunk_ + 1);
while (!waitForChunk(currentChunk_ + 1)) {
sourceParent_->setMonState(inWaitChunk);
usleep(100000);
Expand Down
24 changes: 21 additions & 3 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
unsigned char* dataPosition;

//read header, copy it to a single chunk if necessary
if (currentFile_->fileSizeLeft() < FRDHeaderVersionSize[detectedFRDversion_])
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< "Premature end of input file (missing:"
<< (FRDHeaderVersionSize[detectedFRDversion_] - currentFile_->fileSizeLeft())
<< ") while reading event data for next event header";
bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);

event_ = std::make_unique<FRDEventMsgView>(dataPosition);
Expand All @@ -552,9 +557,10 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {

const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];

if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
if (currentFile_->fileSizeLeft() < msgSize) {
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< "Premature end of input file while reading event data";
<< "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
<< ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
}

if (chunkEnd) {
Expand Down Expand Up @@ -585,6 +591,12 @@ inline evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent() {
chunkIsFree_ = false;
}
}
//sanity-check check that the buffer position has not exceeded file size after preparing event
if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
throw cms::Exception("FedRawDataInputSource::getNextEvent")
<< "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
<< " after reading last event declared size of " << event_->size() << " bytes";
}
} //end multibuffer mode
setMonState(inChecksumEvent);

Expand Down Expand Up @@ -800,7 +812,12 @@ void FedRawDataInputSource::readSupervisor() {
//sleep until woken up by condition or a timeout
if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
counter++;
//if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
if (!(counter % 6000)) {
edm::LogWarning("FedRawDataInputSource")
<< "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
<< ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
<< " / " << maxBufferedFiles_;
}
LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
} else {
assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
Expand Down Expand Up @@ -1432,6 +1449,7 @@ inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size)

if (currentLeft < size) {
//we need next chunk
assert(chunks_.size() > currentChunk_ + 1);
while (!waitForChunk(currentChunk_ + 1)) {
parent_->setMonState(inWaitChunk);
usleep(100000);
Expand Down