diff --git a/EventFilter/Utilities/interface/DAQSource.h b/EventFilter/Utilities/interface/DAQSource.h index 3a7f96a1ca9f5..4d808c9de88b0 100644 --- a/EventFilter/Utilities/interface/DAQSource.h +++ b/EventFilter/Utilities/interface/DAQSource.h @@ -146,7 +146,7 @@ class DAQSource : public edm::RawInputSource { tbb::concurrent_queue> fileQueue_; std::mutex mReader_; - std::vector cvReader_; + std::vector> cvReader_; std::vector tid_active_; std::atomic quit_threads_; diff --git a/EventFilter/Utilities/interface/FedRawDataInputSource.h b/EventFilter/Utilities/interface/FedRawDataInputSource.h index 2ddb518923c7b..62333475f1287 100644 --- a/EventFilter/Utilities/interface/FedRawDataInputSource.h +++ b/EventFilter/Utilities/interface/FedRawDataInputSource.h @@ -154,7 +154,7 @@ class FedRawDataInputSource : public edm::RawInputSource { tbb::concurrent_queue> fileQueue_; std::mutex mReader_; - std::vector cvReader_; + std::vector> cvReader_; std::vector tid_active_; std::atomic quit_threads_; diff --git a/EventFilter/Utilities/src/DAQSource.cc b/EventFilter/Utilities/src/DAQSource.cc index 219d814ee7a39..36dc59356db75 100644 --- a/EventFilter/Utilities/src/DAQSource.cc +++ b/EventFilter/Utilities/src/DAQSource.cc @@ -160,14 +160,18 @@ DAQSource::DAQSource(edm::ParameterSet const& pset, edm::InputSourceDescription quit_threads_ = false; + //prepare data shared by threads for (unsigned int i = 0; i < numConcurrentReads_; i++) { - std::unique_lock lk(startupLock_); - //issue a memory fence here and in threads (constructor was segfaulting without this) thread_quit_signal.push_back(false); workerJob_.push_back(ReaderInfo(nullptr, nullptr)); - cvReader_.push_back(new std::condition_variable); + cvReader_.push_back(std::make_unique()); tid_active_.push_back(0); - threadInit_.store(false, std::memory_order_release); + } + + //start threads + for (unsigned int i = 0; i < numConcurrentReads_; i++) { + //wait for each thread to complete initialization + std::unique_lock lk(startupLock_); workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i)); startupCv_.wait(lk); } @@ -211,15 +215,6 @@ DAQSource::~DAQSource() { delete workerThreads_[i]; } } - for (unsigned int i = 0; i < numConcurrentReads_; i++) - delete cvReader_[i]; - /* - for (unsigned int i=0;i lk(startupLock_); - //issue a memory fence here and in threads (constructor was segfaulting without this) thread_quit_signal.push_back(false); workerJob_.push_back(ReaderInfo(nullptr, nullptr)); - cvReader_.push_back(new std::condition_variable); + cvReader_.push_back(std::make_unique()); tid_active_.push_back(0); - threadInit_.store(false, std::memory_order_release); + } + + //start threads + for (unsigned int i = 0; i < numConcurrentReads_; i++) { + //wait for each thread to complete initialization + std::unique_lock lk(startupLock_); workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i)); startupCv_.wait(lk); } @@ -205,15 +209,6 @@ FedRawDataInputSource::~FedRawDataInputSource() { delete workerThreads_[i]; } } - for (unsigned int i = 0; i < numConcurrentReads_; i++) - delete cvReader_[i]; - /* - for (unsigned int i=0;i lk(startupLock_); readSupervisorThread_ = std::make_unique(&FedRawDataInputSource::readSupervisor, this); startedSupervisorThread_ = true; @@ -765,7 +759,6 @@ void FedRawDataInputSource::rewind_() {} void FedRawDataInputSource::readSupervisor() { bool stop = false; unsigned int currentLumiSection = 0; - //threadInit_.exchange(true,std::memory_order_acquire); { std::unique_lock lk(startupLock_); @@ -1268,7 +1261,6 @@ void FedRawDataInputSource::readSupervisor() { void FedRawDataInputSource::readWorker(unsigned int tid) { bool init = true; - threadInit_.exchange(true, std::memory_order_acquire); while (true) { tid_active_[tid] = false;