-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
Copy pathSecondaryEventProvider.cc
135 lines (123 loc) · 6.62 KB
/
SecondaryEventProvider.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include "FWCore/Concurrency/interface/include_first_syncWait.h"
#include "Mixing/Base/src/SecondaryEventProvider.h"
#include "FWCore/Common/interface/ProcessBlockHelper.h"
#include "FWCore/Framework/interface/ExceptionActions.h"
#include "FWCore/Framework/interface/PreallocationConfiguration.h"
#include "FWCore/Framework/interface/TransitionInfoTypes.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "DataFormats/Provenance/interface/ProductRegistry.h"
#include "oneapi/tbb/task_arena.h"
namespace {
template <typename T, typename U>
void processOneOccurrence(edm::WorkerManager& manager,
typename T::TransitionInfoType& info,
edm::StreamID streamID,
typename T::Context const* topContext,
U const* context,
bool cleaningUpAfterException = false) {
manager.resetAll();
if (manager.allWorkers().empty())
return;
auto token = edm::ServiceRegistry::instance().presentToken();
//we need the arena to guarantee that the syncWait will return to this thread
// and not cause this callstack to possibly be moved to a new thread
tbb::task_arena localArena{tbb::this_task_arena::max_concurrency()};
std::exception_ptr exceptPtr = localArena.execute([&]() {
return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
manager.processOneOccurrenceAsync<T, U>(std::move(iHolder), info, token, streamID, topContext, context);
});
});
if (exceptPtr) {
try {
edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
} catch (cms::Exception& ex) {
if (ex.context().empty()) {
edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, cleaningUpAfterException);
} else {
edm::addContextAndPrintException("", ex, cleaningUpAfterException);
}
throw;
}
}
}
} // namespace
namespace edm {
SecondaryEventProvider::SecondaryEventProvider(std::vector<ParameterSet>& psets,
ProductRegistry& preg,
std::shared_ptr<ProcessConfiguration> processConfiguration)
: exceptionToActionTable_(new ExceptionToActionTable),
workerManager_(std::make_shared<ActivityRegistry>(), *exceptionToActionTable_) {
std::vector<std::string> shouldBeUsedLabels;
std::set<std::string> unscheduledLabels;
const PreallocationConfiguration preallocConfig;
for (auto& pset : psets) {
std::string label = pset.getParameter<std::string>("@module_label");
workerManager_.addToUnscheduledWorkers(
pset, preg, &preallocConfig, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
}
if (!unscheduledLabels.empty()) {
preg.setUnscheduledProducts(unscheduledLabels);
}
} // SecondaryEventProvider::SecondaryEventProvider
void SecondaryEventProvider::beginJob(ProductRegistry const& iRegistry,
eventsetup::ESRecordsToProxyIndices const& iIndices) {
ProcessBlockHelper dummyProcessBlockHelper;
workerManager_.beginJob(iRegistry, iIndices, dummyProcessBlockHelper);
}
//NOTE: When the Stream interfaces are propagated to the modules, this code must be updated
// to also send the stream based transitions
void SecondaryEventProvider::beginRun(RunPrincipal& run,
const EventSetupImpl& setup,
ModuleCallingContext const* mcc,
StreamContext& sContext) {
RunTransitionInfo info(run, setup);
processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> >(
workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> >(
workerManager_, info, sContext.streamID(), &sContext, mcc);
}
void SecondaryEventProvider::beginLuminosityBlock(LuminosityBlockPrincipal& lumi,
const EventSetupImpl& setup,
ModuleCallingContext const* mcc,
StreamContext& sContext) {
LumiTransitionInfo info(lumi, setup);
processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin> >(
workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin> >(
workerManager_, info, sContext.streamID(), &sContext, mcc);
}
void SecondaryEventProvider::endRun(RunPrincipal& run,
const EventSetupImpl& setup,
ModuleCallingContext const* mcc,
StreamContext& sContext) {
RunTransitionInfo info(run, setup);
processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> >(
workerManager_, info, sContext.streamID(), &sContext, mcc);
processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> >(
workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
}
void SecondaryEventProvider::endLuminosityBlock(LuminosityBlockPrincipal& lumi,
const EventSetupImpl& setup,
ModuleCallingContext const* mcc,
StreamContext& sContext) {
LumiTransitionInfo info(lumi, setup);
processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd> >(
workerManager_, info, sContext.streamID(), &sContext, mcc);
processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd> >(
workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
}
void SecondaryEventProvider::setupPileUpEvent(EventPrincipal& ep,
const EventSetupImpl& setup,
StreamContext& sContext) {
workerManager_.setupResolvers(ep);
EventTransitionInfo info(ep, setup);
workerManager_.setupOnDemandSystem(info);
}
void SecondaryEventProvider::beginStream(edm::StreamID iID, StreamContext& sContext) {
workerManager_.beginStream(iID, sContext);
}
void SecondaryEventProvider::endStream(edm::StreamID iID, StreamContext& sContext) {
workerManager_.endStream(iID, sContext);
}
} // namespace edm