Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAQ] fix unsafe vector handling in input sources (13_1_X) #41669

Merged
merged 2 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion EventFilter/Utilities/interface/DAQSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class DAQSource : public edm::RawInputSource {
tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;

std::mutex mReader_;
std::vector<std::condition_variable*> cvReader_;
std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
std::vector<unsigned int> tid_active_;

std::atomic<bool> quit_threads_;
Expand Down
2 changes: 1 addition & 1 deletion EventFilter/Utilities/interface/FedRawDataInputSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class FedRawDataInputSource : public edm::RawInputSource {
tbb::concurrent_queue<std::unique_ptr<InputFile>> fileQueue_;

std::mutex mReader_;
std::vector<std::condition_variable*> cvReader_;
std::vector<std::unique_ptr<std::condition_variable>> cvReader_;
std::vector<unsigned int> tid_active_;

std::atomic<bool> quit_threads_;
Expand Down
21 changes: 8 additions & 13 deletions EventFilter/Utilities/src/DAQSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,18 @@ DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription

quit_threads_ = false;

//prepare data shared by threads
for (unsigned int i = 0; i < numConcurrentReads_; i++) {
std::unique_lock<std::mutex> lk(startupLock_);
//issue a memory fence here and in threads (constructor was segfaulting without this)
thread_quit_signal.push_back(false);
workerJob_.push_back(ReaderInfo(nullptr, nullptr));
cvReader_.push_back(new std::condition_variable);
cvReader_.push_back(std::make_unique<std::condition_variable>());
tid_active_.push_back(0);
threadInit_.store(false, std::memory_order_release);
}

//start threads
for (unsigned int i = 0; i < numConcurrentReads_; i++) {
//wait for each thread to complete initialization
std::unique_lock<std::mutex> lk(startupLock_);
workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
startupCv_.wait(lk);
}
Expand Down Expand Up @@ -211,15 +215,6 @@ DAQSource::~DAQSource() {
delete workerThreads_[i];
}
}
for (unsigned int i = 0; i < numConcurrentReads_; i++)
delete cvReader_[i];
/*
for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
InputChunk *ch;
while (!freeChunks_.try_pop(ch)) {}
delete ch;
}
*/
}

void DAQSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
Expand Down
24 changes: 8 additions & 16 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,18 @@ FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm:

quit_threads_ = false;

//prepare data shared by threads
for (unsigned int i = 0; i < numConcurrentReads_; i++) {
std::unique_lock<std::mutex> lk(startupLock_);
//issue a memory fence here and in threads (constructor was segfaulting without this)
thread_quit_signal.push_back(false);
workerJob_.push_back(ReaderInfo(nullptr, nullptr));
cvReader_.push_back(new std::condition_variable);
cvReader_.push_back(std::make_unique<std::condition_variable>());
tid_active_.push_back(0);
threadInit_.store(false, std::memory_order_release);
}

//start threads
for (unsigned int i = 0; i < numConcurrentReads_; i++) {
//wait for each thread to complete initialization
std::unique_lock<std::mutex> lk(startupLock_);
workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
startupCv_.wait(lk);
}
Expand Down Expand Up @@ -205,15 +209,6 @@ FedRawDataInputSource::~FedRawDataInputSource() {
delete workerThreads_[i];
}
}
for (unsigned int i = 0; i < numConcurrentReads_; i++)
delete cvReader_[i];
/*
for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
InputChunk *ch;
while (!freeChunks_.try_pop(ch)) {}
delete ch;
}
*/
}

void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
Expand Down Expand Up @@ -244,7 +239,6 @@ void FedRawDataInputSource::fillDescriptions(edm::ConfigurationDescriptions& des
edm::RawInputSource::Next FedRawDataInputSource::checkNext() {
if (!startedSupervisorThread_) {
//this thread opens new files and dispatches reading to worker readers
//threadInit_.store(false,std::memory_order_release);
std::unique_lock<std::mutex> lk(startupLock_);
readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
startedSupervisorThread_ = true;
Expand Down Expand Up @@ -765,7 +759,6 @@ void FedRawDataInputSource::rewind_() {}
void FedRawDataInputSource::readSupervisor() {
bool stop = false;
unsigned int currentLumiSection = 0;
//threadInit_.exchange(true,std::memory_order_acquire);

{
std::unique_lock<std::mutex> lk(startupLock_);
Expand Down Expand Up @@ -1268,7 +1261,6 @@ void FedRawDataInputSource::readSupervisor() {

void FedRawDataInputSource::readWorker(unsigned int tid) {
bool init = true;
threadInit_.exchange(true, std::memory_order_acquire);

while (true) {
tid_active_[tid] = false;
Expand Down