From 1a8b3775885b3ee5229340f54fd1f2df1cc0de70 Mon Sep 17 00:00:00 2001 From: Matti Kortelainen Date: Tue, 30 Aug 2022 16:57:20 +0200 Subject: [PATCH 1/2] Initialize stream workers before processing empty (End)Paths --- FWCore/Framework/src/StreamSchedule.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index 20338b0791a30..cd0614afadb0e 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -890,6 +890,12 @@ namespace edm { ServiceRegistry::Operate guard(serviceToken); Traits::preScheduleSignal(actReg_.get(), &streamContext_); + // Data dependencies need to be set up before marking empty + // (End)Paths complete in case something consumes the status of + // the empty (EndPath) + workerManager_.setupResolvers(ep); + workerManager_.setupOnDemandSystem(info); + HLTPathStatus hltPathStatus(hlt::Pass, 0); for (int empty_trig_path : empty_trig_paths_) { results_->at(empty_trig_path) = hltPathStatus; @@ -912,9 +918,6 @@ namespace edm { } } - workerManager_.setupResolvers(ep); - workerManager_.setupOnDemandSystem(info); - ++total_events_; //use to give priorities on an error to ones from Paths From ddf3b87614816a13be7d808070a45a8d49de03cf Mon Sep 17 00:00:00 2001 From: Matti Kortelainen Date: Wed, 21 Sep 2022 17:08:25 +0200 Subject: [PATCH 2/2] Use Worker's WaitingTaskList directly in PuttableProductResolver This change was triggered by a case where a PuttableProductResolver was filled by a Worker that produced many products, and one of the products (A) had a Ref to another product (B), and the product B was not consumed by any module (it was only accessed through the Ref). Since the putProduct() released the WaitingTaskList of the Resolver, that lead to the consumer of A to run, and that consumer dereferenced the Ref to see that B was not there. Besides scheduled modules another cases where PuttableProductResolver is used are Sources inheriting PuttableSourceBase, and TestProcessor. In these cases the products are put into the Resolvers (or left as non-produced) before launching the prefetching of the unscheduled system. Therefore in these use cases the consuming modules do not need to wait for the Resolver to be filled. After several fix attempts it seemed easiest to just use the Worker's WaitingTaskList directly in PuttableProductResolver. This approach fulfills the requirements of both Worker and Source(-like) use cases, and even simplifies the code. --- FWCore/Framework/interface/maker/Worker.h | 3 ++ FWCore/Framework/src/ProductResolvers.cc | 46 +++++------------------ FWCore/Framework/src/ProductResolvers.h | 14 +++---- 3 files changed, 20 insertions(+), 43 deletions(-) diff --git a/FWCore/Framework/interface/maker/Worker.h b/FWCore/Framework/interface/maker/Worker.h index aab872680fc3c..aee2fcd6e799c 100644 --- a/FWCore/Framework/interface/maker/Worker.h +++ b/FWCore/Framework/interface/maker/Worker.h @@ -241,6 +241,9 @@ namespace edm { virtual bool hasAccumulator() const = 0; + // Used in PuttableProductResolver + edm::WaitingTaskList& waitingTaskList() { return waitingTasks_; } + protected: template friend class workerhelper::CallImpl; diff --git a/FWCore/Framework/src/ProductResolvers.cc b/FWCore/Framework/src/ProductResolvers.cc index d6acf88e3e55d..b532d92981d08 100644 --- a/FWCore/Framework/src/ProductResolvers.cc +++ b/FWCore/Framework/src/ProductResolvers.cc @@ -407,47 +407,21 @@ namespace edm { } } - //Need to try modifying prefetchRequested_ before adding to m_waitingTasks - bool expected = false; - bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true); - m_waitingTasks.add(waitTask); - - if (worker_ and prefetchRequested) { - //using a waiting task to do a callback guarantees that - // the m_waitingTasks list will be released from waiting even - // if the module does not put this data product or the - // module has an exception while running - - auto waiting = make_waiting_task([this](std::exception_ptr const* iException) { - if (nullptr != iException) { - m_waitingTasks.doneWaiting(*iException); - } else { - m_waitingTasks.doneWaiting(std::exception_ptr()); - } - }); - worker_->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting)); + if (waitingTasks_) { + // using a waiting task to do a callback guarantees that the + // waitingTasks_ list (from the worker) will be released from + // waiting even if the module does not put this data product + // or the module has an exception while running + waitingTasks_->add(waitTask); } } } - void PuttableProductResolver::putProduct(std::unique_ptr edp) const { - ProducedProductResolver::putProduct(std::move(edp)); - bool expected = false; - if (prefetchRequested_.compare_exchange_strong(expected, true)) { - m_waitingTasks.doneWaiting(std::exception_ptr()); - } - } - - void PuttableProductResolver::resetProductData_(bool deleteEarly) { - if (not deleteEarly) { - prefetchRequested_ = false; - m_waitingTasks.reset(); - } - DataManagingProductResolver::resetProductData_(deleteEarly); - } - void PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) { - worker_ = iConfigure.findWorker(branchDescription().moduleLabel()); + auto worker = iConfigure.findWorker(branchDescription().moduleLabel()); + if (worker) { + waitingTasks_ = &worker->waitingTaskList(); + } } void UnscheduledProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) { diff --git a/FWCore/Framework/src/ProductResolvers.h b/FWCore/Framework/src/ProductResolvers.h index 530cffea7b805..efe681ce6065c 100644 --- a/FWCore/Framework/src/ProductResolvers.h +++ b/FWCore/Framework/src/ProductResolvers.h @@ -201,7 +201,7 @@ namespace edm { class PuttableProductResolver : public ProducedProductResolver { public: explicit PuttableProductResolver(std::shared_ptr bd) - : ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {} + : ProducedProductResolver(bd, ProductStatus::NotPut) {} void setupUnscheduled(UnscheduledConfigurator const&) final; @@ -218,12 +218,12 @@ namespace edm { ModuleCallingContext const* mcc) const override; bool unscheduledWasNotRun_() const override { return false; } - void putProduct(std::unique_ptr edp) const override; - void resetProductData_(bool deleteEarly) override; - - CMS_THREAD_SAFE mutable WaitingTaskList m_waitingTasks; - Worker* worker_; - mutable std::atomic prefetchRequested_; + // The WaitingTaskList below is the one from the worker, if one + // corresponds to this ProductResolver. For the Source-like cases + // where there is no such Worker, the tasks depending on the data + // depending on this ProductResolver are assumed to be eligible to + // run immediately after their prefetch. + WaitingTaskList* waitingTasks_ = nullptr; }; class UnscheduledProductResolver : public ProducedProductResolver {