Skip to content

Commit

Permalink
Merge pull request #45017 from wddgit/exceptionBehaviorBeginEndRun
Browse files Browse the repository at this point in the history
Improve behavior after exception in begin/end run transitions
  • Loading branch information
cmsbuild authored Jun 3, 2024
2 parents 4e5614a + 27c49ad commit 36f6e95
Show file tree
Hide file tree
Showing 33 changed files with 1,230 additions and 557 deletions.
5 changes: 1 addition & 4 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,7 @@ namespace edm {

InputSource::ItemType processRuns();
void beginRunAsync(IOVSyncValue const&, WaitingTaskHolder);
void streamBeginRunAsync(unsigned int iStream,
std::shared_ptr<RunProcessingStatus>,
bool precedingTasksSucceeded,
WaitingTaskHolder);
void streamBeginRunAsync(unsigned int iStream, std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder) noexcept;
void releaseBeginRunResources(unsigned int iStream);
void endRunAsync(std::shared_ptr<RunProcessingStatus>, WaitingTaskHolder);
void handleEndRunExceptions(std::exception_ptr, WaitingTaskHolder const&);
Expand Down
9 changes: 3 additions & 6 deletions FWCore/Framework/interface/GlobalSchedule.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#ifndef FWCore_Framework_GlobalSchedule_h
#define FWCore_Framework_GlobalSchedule_h

/*
*/

#include "DataFormats/Provenance/interface/ModuleDescription.h"
#include "FWCore/Common/interface/FWCoreCommonFwd.h"
#include "FWCore/Framework/interface/EventPrincipal.h"
Expand Down Expand Up @@ -103,6 +100,9 @@ namespace edm {
AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }

private:
/// returns the action table
ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }

template <typename T>
void preScheduleSignal(GlobalContext const*, ServiceToken const&);

Expand All @@ -114,9 +114,6 @@ namespace edm {
bool cleaningUpAfterException,
std::exception_ptr&);

/// returns the action table
ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }

std::vector<WorkerManager> workerManagers_;
std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.
std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
Expand Down
20 changes: 1 addition & 19 deletions FWCore/Framework/interface/Path.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,7 @@ namespace edm {

Path& operator=(Path const&) = delete;

template <typename T>
void runAllModulesAsync(WaitingTaskHolder,
typename T::TransitionInfoType const&,
ServiceToken const&,
StreamID const&,
typename T::Context const*);

void processOneOccurrenceAsync(
void processEventUsingPathAsync(
WaitingTaskHolder, EventTransitionInfo const&, ServiceToken const&, StreamID const&, StreamContext const*);

int bitPosition() const { return bitpos_; }
Expand Down Expand Up @@ -178,17 +171,6 @@ namespace edm {
};
} // namespace

template <typename T>
void Path::runAllModulesAsync(WaitingTaskHolder task,
typename T::TransitionInfoType const& info,
ServiceToken const& token,
StreamID const& streamID,
typename T::Context const* context) {
for (auto& worker : workers_) {
worker.runWorkerAsync<T>(task, info, token, streamID, context);
}
}

} // namespace edm

#endif
164 changes: 64 additions & 100 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
#include "FWCore/MessageLogger/interface/JobReport.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"
#include "FWCore/Concurrency/interface/FunctorTask.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
Expand All @@ -99,12 +100,10 @@

namespace edm {

class ActivityRegistry;
class BranchIDListHelper;
class ExceptionCollector;
class ExceptionToActionTable;
class OutputModuleCommunicator;
class ProcessContext;
class UnscheduledCallProducer;
class WorkerInPath;
class ModuleRegistry;
Expand All @@ -120,38 +119,6 @@ namespace edm {
class TriggerNamesService;
}

namespace {
template <typename T>
class StreamScheduleSignalSentry {
public:
StreamScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
: a_(a), context_(context), allowThrow_(false) {
if (a_)
T::preScheduleSignal(a_, context_);
}
~StreamScheduleSignalSentry() noexcept(false) {
// Caught exception is rethrown (when allowed)
CMS_SA_ALLOW try {
if (a_) {
T::postScheduleSignal(a_, context_);
}
} catch (...) {
if (allowThrow_) {
throw;
}
}
}

void allowThrow() { allowThrow_ = true; }

private:
// We own none of these resources.
ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
typename T::Context const* context_;
bool allowThrow_;
};
} // namespace

class StreamSchedule {
public:
typedef std::vector<std::string> vstring;
Expand Down Expand Up @@ -254,9 +221,13 @@ namespace edm {
edm::ProductRegistry const& preg);

/// returns the collection of pointers to workers
AllWorkers const& allWorkers() const { return workerManager_.allWorkers(); }
AllWorkers const& allWorkersBeginEnd() const { return workerManagerBeginEnd_.allWorkers(); }
AllWorkers const& allWorkersRuns() const { return workerManagerRuns_.allWorkers(); }
AllWorkers const& allWorkersLumisAndEvents() const { return workerManagerLumisAndEvents_.allWorkers(); }

AllWorkers const& unscheduledWorkers() const { return workerManager_.unscheduledWorkers(); }
AllWorkers const& unscheduledWorkersLumisAndEvents() const {
return workerManagerLumisAndEvents_.unscheduledWorkers();
}
unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }

StreamContext const& context() const { return streamContext_; }
Expand All @@ -269,28 +240,8 @@ namespace edm {
};

private:
//Sentry class to only send a signal if an
// exception occurs. An exception is identified
// by the destructor being called without first
// calling completedSuccessfully().
class SendTerminationSignalIfException {
public:
SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::StreamContext const* iContext)
: reg_(iReg), context_(iContext) {}
~SendTerminationSignalIfException() {
if (reg_) {
reg_->preStreamEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
}
}
void completedSuccessfully() { reg_ = nullptr; }

private:
edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
StreamContext const* context_;
};

/// returns the action table
ExceptionToActionTable const& actionTable() const { return workerManager_.actionTable(); }
ExceptionToActionTable const& actionTable() const { return workerManagerLumisAndEvents_.actionTable(); }

void resetAll();

Expand Down Expand Up @@ -351,7 +302,20 @@ namespace edm {
std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
ExceptionToActionTable const& actions);

WorkerManager workerManager_;
template <typename T>
void preScheduleSignal(StreamContext const*) const;

template <typename T>
void postScheduleSignal(StreamContext const*, ServiceWeakToken const&, std::exception_ptr&) const noexcept;

void handleException(StreamContext const&,
ServiceWeakToken const&,
bool cleaningUpAfterException,
std::exception_ptr&) const noexcept;

WorkerManager workerManagerBeginEnd_;
WorkerManager workerManagerRuns_;
WorkerManager workerManagerLumisAndEvents_;
std::shared_ptr<ActivityRegistry> actReg_; // We do not use propagate_const because the registry itself is mutable.

edm::propagate_const<TrigResPtr> results_;
Expand Down Expand Up @@ -409,37 +373,9 @@ namespace edm {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() { std::rethrow_exception(excpt); });
} catch (cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if (ex.context().empty()) {
ost << "Processing " << T::transitionName() << " " << id;
}
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}

// We are already handling an earlier exception, so ignore it
// if this signal results in another exception being thrown.
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
} catch (...) {
}
}
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), &streamContext_);
} catch (...) {
if (not excpt) {
excpt = std::current_exception();
}
handleException(streamContext_, weakToken, cleaningUpAfterException, excpt);
}
postScheduleSignal<T>(&streamContext_, weakToken, excpt);
iHolder.doneWaiting(excpt);
});

Expand All @@ -448,24 +384,19 @@ namespace edm {
auto token = weakToken.lock();
ServiceRegistry::Operate op(token);
// Caught exception is propagated via WaitingTaskHolder
WorkerManager* workerManager = &workerManagerRuns_;
if (T::branchType_ == InLumi) {
workerManager = &workerManagerLumisAndEvents_;
}
CMS_SA_ALLOW try {
T::preScheduleSignal(actReg_.get(), &streamContext_);

workerManager_.resetAll();
preScheduleSignal<T>(&streamContext_);
workerManager->resetAll();
} catch (...) {
h.doneWaiting(std::current_exception());
return;
}

for (auto& p : end_paths_) {
p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
}

for (auto& p : trig_paths_) {
p.runAllModulesAsync<T>(h, info, token, streamID_, &streamContext_);
}

workerManager_.processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
});

if (streamID_.value() == 0) {
Expand All @@ -484,6 +415,39 @@ namespace edm {
});
}
}

template <typename T>
void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
try {
convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
} catch (cms::Exception& ex) {
std::ostringstream ost;
ex.addContext("Handling pre signal, likely in a service function");
exceptionContext(ost, *streamContext);
ex.addContext(ost.str());
throw;
}
}

template <typename T>
void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
ServiceWeakToken const& weakToken,
std::exception_ptr& excpt) const noexcept {
try {
convertException::wrap([this, &weakToken, streamContext]() {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), streamContext);
});
} catch (cms::Exception& ex) {
if (not excpt) {
std::ostringstream ost;
ex.addContext("Handling post signal, likely in a service function");
exceptionContext(ost, *streamContext);
ex.addContext(ost.str());
excpt = std::current_exception();
}
}
}
} // namespace edm

#endif
45 changes: 7 additions & 38 deletions FWCore/Framework/interface/UnscheduledCallProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,22 @@
*/

#include "FWCore/Framework/interface/BranchActionType.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/OccurrenceTraits.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Framework/interface/maker/Worker.h"
#include "FWCore/Framework/interface/UnscheduledAuxiliary.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/ServiceRegistry/interface/ParentContext.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
#include "FWCore/Utilities/interface/Signal.h"
#include "FWCore/Utilities/interface/StreamID.h"

#include <vector>
#include <unordered_map>
#include <string>
#include <sstream>
#include <algorithm>
#include <cassert>
#include <functional>
#include <vector>

namespace edm {

class EventTransitionInfo;
class ModuleCallingContext;

class UnscheduledCallProducer {
public:
Expand Down Expand Up @@ -67,28 +64,6 @@ namespace edm {
const_iterator end() const { return unscheduledWorkers_.end(); }
worker_container const& workers() const { return unscheduledWorkers_; }

template <typename T, typename U>
void runNowAsync(WaitingTaskHolder task,
typename T::TransitionInfoType const& info,
ServiceToken const& token,
StreamID streamID,
typename T::Context const* topContext,
U const* context) const noexcept {
//do nothing for event since we will run when requested
if (!T::isEvent_) {
for (auto worker : unscheduledWorkers_) {
ParentContext parentContext(context);

// We do not need to run prefetching here because this only handles
// stream transitions for runs and lumis. There are no products put
// into the runs or lumis in stream transitions, so there can be
// no data dependencies which require prefetching. Prefetching is
// needed for global transitions, but they are run elsewhere.
worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
}
}
}

template <typename T>
void runAccumulatorsAsync(WaitingTaskHolder task,
typename T::TransitionInfoType const& info,
Expand All @@ -102,12 +77,6 @@ namespace edm {
}

private:
template <typename T, typename ID>
void addContextToException(cms::Exception& ex, Worker const* worker, ID const& id) const {
std::ostringstream ost;
ost << "Processing " << T::transitionName() << " " << id;
ex.addContext(ost.str());
}
worker_container unscheduledWorkers_;
worker_container accumulatorWorkers_;
UnscheduledAuxiliary aux_;
Expand Down
Loading

0 comments on commit 36f6e95

Please sign in to comment.