From cfbbed1d51f02d6e33110f82e14caf644e9a6ba9 Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Sun, 14 May 2023 20:06:17 +0200 Subject: [PATCH 1/2] - intialize vectors before spawning threads to avoid waiting on a condition-variable thread that is part of a vector being expanded. - remove synchronization point since creation of a thread already implies a memory barrier. --- .../interface/FedRawDataInputSource.h | 2 +- .../Utilities/src/FedRawDataInputSource.cc | 24 +++++++------------ 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/EventFilter/Utilities/interface/FedRawDataInputSource.h b/EventFilter/Utilities/interface/FedRawDataInputSource.h index 2ddb518923c7b..62333475f1287 100644 --- a/EventFilter/Utilities/interface/FedRawDataInputSource.h +++ b/EventFilter/Utilities/interface/FedRawDataInputSource.h @@ -154,7 +154,7 @@ class FedRawDataInputSource : public edm::RawInputSource { tbb::concurrent_queue> fileQueue_; std::mutex mReader_; - std::vector cvReader_; + std::vector> cvReader_; std::vector tid_active_; std::atomic quit_threads_; diff --git a/EventFilter/Utilities/src/FedRawDataInputSource.cc b/EventFilter/Utilities/src/FedRawDataInputSource.cc index f2fa8529fcc35..abe9072eae84c 100644 --- a/EventFilter/Utilities/src/FedRawDataInputSource.cc +++ b/EventFilter/Utilities/src/FedRawDataInputSource.cc @@ -155,14 +155,18 @@ FedRawDataInputSource::FedRawDataInputSource(edm::ParameterSet const& pset, edm: quit_threads_ = false; + //prepare data shared by threads for (unsigned int i = 0; i < numConcurrentReads_; i++) { - std::unique_lock lk(startupLock_); - //issue a memory fence here and in threads (constructor was segfaulting without this) thread_quit_signal.push_back(false); workerJob_.push_back(ReaderInfo(nullptr, nullptr)); - cvReader_.push_back(new std::condition_variable); + cvReader_.push_back(std::make_unique()); tid_active_.push_back(0); - threadInit_.store(false, std::memory_order_release); + } + + //start threads + for (unsigned int i = 0; i < numConcurrentReads_; i++) { + //wait for each thread to complete initialization + std::unique_lock lk(startupLock_); workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i)); startupCv_.wait(lk); } @@ -205,15 +209,6 @@ FedRawDataInputSource::~FedRawDataInputSource() { delete workerThreads_[i]; } } - for (unsigned int i = 0; i < numConcurrentReads_; i++) - delete cvReader_[i]; - /* - for (unsigned int i=0;i lk(startupLock_); readSupervisorThread_ = std::make_unique(&FedRawDataInputSource::readSupervisor, this); startedSupervisorThread_ = true; @@ -765,7 +759,6 @@ void FedRawDataInputSource::rewind_() {} void FedRawDataInputSource::readSupervisor() { bool stop = false; unsigned int currentLumiSection = 0; - //threadInit_.exchange(true,std::memory_order_acquire); { std::unique_lock lk(startupLock_); @@ -1268,7 +1261,6 @@ void FedRawDataInputSource::readSupervisor() { void FedRawDataInputSource::readWorker(unsigned int tid) { bool init = true; - threadInit_.exchange(true, std::memory_order_acquire); while (true) { tid_active_[tid] = false; From a2d9782517e1e371c13f75e2c6d7a698c19aef9e Mon Sep 17 00:00:00 2001 From: Srecko Morovic Date: Sun, 14 May 2023 20:22:20 +0200 Subject: [PATCH 2/2] DAQSource (same change as in FedRawDataInputSource) - intialize vectors before spawning threads to avoid waiting on a condition-variable thread that is part of a vector being expanded. - remove synchronization point since creation of a thread already implies a memory barrier. --- EventFilter/Utilities/interface/DAQSource.h | 2 +- EventFilter/Utilities/src/DAQSource.cc | 21 ++++++++------------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/EventFilter/Utilities/interface/DAQSource.h b/EventFilter/Utilities/interface/DAQSource.h index 3a7f96a1ca9f5..4d808c9de88b0 100644 --- a/EventFilter/Utilities/interface/DAQSource.h +++ b/EventFilter/Utilities/interface/DAQSource.h @@ -146,7 +146,7 @@ class DAQSource : public edm::RawInputSource { tbb::concurrent_queue> fileQueue_; std::mutex mReader_; - std::vector cvReader_; + std::vector> cvReader_; std::vector tid_active_; std::atomic quit_threads_; diff --git a/EventFilter/Utilities/src/DAQSource.cc b/EventFilter/Utilities/src/DAQSource.cc index 219d814ee7a39..36dc59356db75 100644 --- a/EventFilter/Utilities/src/DAQSource.cc +++ b/EventFilter/Utilities/src/DAQSource.cc @@ -160,14 +160,18 @@ DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription quit_threads_ = false; + //prepare data shared by threads for (unsigned int i = 0; i < numConcurrentReads_; i++) { - std::unique_lock lk(startupLock_); - //issue a memory fence here and in threads (constructor was segfaulting without this) thread_quit_signal.push_back(false); workerJob_.push_back(ReaderInfo(nullptr, nullptr)); - cvReader_.push_back(new std::condition_variable); + cvReader_.push_back(std::make_unique()); tid_active_.push_back(0); - threadInit_.store(false, std::memory_order_release); + } + + //start threads + for (unsigned int i = 0; i < numConcurrentReads_; i++) { + //wait for each thread to complete initialization + std::unique_lock lk(startupLock_); workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i)); startupCv_.wait(lk); } @@ -211,15 +215,6 @@ DAQSource::~DAQSource() { delete workerThreads_[i]; } } - for (unsigned int i = 0; i < numConcurrentReads_; i++) - delete cvReader_[i]; - /* - for (unsigned int i=0;i