Skip to content

Commit

Permalink
Add ActivityRegistry signals for between Events
Browse files Browse the repository at this point in the history
Added a pair of signas that happens when the source is determining what the next transition and another pair when the Event data products are being deleted.
  • Loading branch information
Dr15Jones committed Nov 6, 2023
1 parent 324fa41 commit 645964a
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 6 deletions.
41 changes: 35 additions & 6 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -860,14 +860,23 @@ namespace edm {
return returnValue;
}

namespace {
struct SourceNextGuard {
SourceNextGuard(edm::ActivityRegistry& iReg) : act_(iReg) { iReg.preSourceNextTransitionSignal_.emit(); }
~SourceNextGuard() { act_.postSourceNextTransitionSignal_.emit(); }
edm::ActivityRegistry& act_;
};
} // namespace
InputSource::ItemType EventProcessor::nextTransitionType() {
SendSourceTerminationSignalIfException sentry(actReg_.get());
InputSource::ItemType itemType;
//For now, do nothing with InputSource::IsSynchronize
do {
itemType = input_->nextItemType();
} while (itemType == InputSource::IsSynchronize);

{
SourceNextGuard guard(*actReg_.get());
//For now, do nothing with InputSource::IsSynchronize
do {
itemType = input_->nextItemType();
} while (itemType == InputSource::IsSynchronize);
}
lastSourceTransition_ = itemType;
sentry.completedSuccessfully();

Expand Down Expand Up @@ -2282,6 +2291,18 @@ namespace edm {
iHolder.group()->run([this, iHolder, iStreamIndex]() { processEventAsyncImpl(iHolder, iStreamIndex); });
}

namespace {
struct ClearEventGuard {
ClearEventGuard(edm::ActivityRegistry& iReg, edm::StreamContext const& iContext)
: act_(iReg), context_(iContext) {
iReg.preClearEventSignal_.emit(iContext);
}
~ClearEventGuard() { act_.postClearEventSignal_.emit(context_); }
edm::ActivityRegistry& act_;
edm::StreamContext const& context_;
};
} // namespace

void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
auto pep = &(principalCache_.eventPrincipal(iStreamIndex));

Expand All @@ -2305,8 +2326,16 @@ namespace edm {
//NOTE: behavior change. previously if an exception happened looper was still called. Now it will not be called
ServiceRegistry::Operate operateLooper(serviceToken_);
processEventWithLooper(*pep, iStreamIndex);
}) | then([pep](auto nextTask) {
}) | then([this, pep](auto nextTask) {
FDEBUG(1) << "\tprocessEvent\n";
StreamContext streamContext(pep->streamID(),
StreamContext::Transition::kEvent,
pep->id(),
pep->runPrincipal().index(),
pep->luminosityBlockPrincipal().index(),
pep->time(),
&processContext_);
ClearEventGuard guard(*this->actReg_.get(), streamContext);
pep->clearEventPrincipal();
}) | runLast(iHolder);
}
Expand Down
28 changes: 28 additions & 0 deletions FWCore/ServiceRegistry/interface/ActivityRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ namespace edm {
void watchJobFailure(JobFailure::slot_type const& iSlot) { jobFailureSignal_.connect_front(iSlot); }
AR_WATCH_USING_METHOD_0(watchJobFailure)

/// signal is emitted before the source is requested to find the next transition
typedef signalslot::Signal<void()> PreSourceNextTransition;
PreSourceNextTransition preSourceNextTransitionSignal_;
void watchPreSourceNextTransition(PreSourceNextTransition::slot_type const& iSlot) {
preSourceNextTransitionSignal_.connect(iSlot);
}
AR_WATCH_USING_METHOD_0(watchPreSourceNextTransition)

/// signal is emitted after the source has returned the next transition
typedef signalslot::Signal<void()> PostSourceNextTransition;
PostSourceNextTransition postSourceNextTransitionSignal_;
void watchPostSourceNextTransition(PostSourceNextTransition::slot_type const& iSlot) {
postSourceNextTransitionSignal_.connect_front(iSlot);
}
AR_WATCH_USING_METHOD_0(watchPostSourceNextTransition)

/// signal is emitted before the source starts creating an Event
typedef signalslot::Signal<void(StreamID)> PreSourceEvent;
PreSourceEvent preSourceSignal_;
Expand Down Expand Up @@ -502,6 +518,18 @@ namespace edm {
void watchPostEvent(PostEvent::slot_type const& iSlot) { postEventSignal_.connect_front(iSlot); }
AR_WATCH_USING_METHOD_1(watchPostEvent)

typedef signalslot::Signal<void(StreamContext const&)> PreClearEvent;
/// signal is emitted before the data products in the Event are cleared
PreClearEvent preClearEventSignal_;
void watchPreClearEvent(PreClearEvent::slot_type const& iSlot) { preClearEventSignal_.connect(iSlot); }
AR_WATCH_USING_METHOD_1(watchPreClearEvent)

typedef signalslot::Signal<void(StreamContext const&)> PostClearEvent;
/// signal is emitted after all data products in the Event have been cleared
PostClearEvent postClearEventSignal_;
void watchPostClearEvent(PostClearEvent::slot_type const& iSlot) { postClearEventSignal_.connect_front(iSlot); }
AR_WATCH_USING_METHOD_1(watchPostClearEvent)

/// signal is emitted before starting to process a Path for an event
typedef signalslot::Signal<void(StreamContext const&, PathContext const&)> PrePathEvent;
PrePathEvent prePathEventSignal_;
Expand Down
12 changes: 12 additions & 0 deletions FWCore/ServiceRegistry/src/ActivityRegistry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ namespace edm {
preSourceSignal_.connect(std::cref(iOther.preSourceSignal_));
postSourceSignal_.connect(std::cref(iOther.postSourceSignal_));

preSourceNextTransitionSignal_.connect(std::cref(iOther.preSourceNextTransitionSignal_));
postSourceNextTransitionSignal_.connect(std::cref(iOther.postSourceNextTransitionSignal_));

preSourceLumiSignal_.connect(std::cref(iOther.preSourceLumiSignal_));
postSourceLumiSignal_.connect(std::cref(iOther.postSourceLumiSignal_));

Expand Down Expand Up @@ -183,6 +186,9 @@ namespace edm {
preEventSignal_.connect(std::cref(iOther.preEventSignal_));
postEventSignal_.connect(std::cref(iOther.postEventSignal_));

preClearEventSignal_.connect(std::cref(iOther.preClearEventSignal_));
postClearEventSignal_.connect(std::cref(iOther.postClearEventSignal_));

prePathEventSignal_.connect(std::cref(iOther.prePathEventSignal_));
postPathEventSignal_.connect(std::cref(iOther.postPathEventSignal_));

Expand Down Expand Up @@ -343,6 +349,9 @@ namespace edm {
copySlotsToFrom(preSourceSignal_, iOther.preSourceSignal_);
copySlotsToFromReverse(postSourceSignal_, iOther.postSourceSignal_);

copySlotsToFrom(preSourceNextTransitionSignal_, iOther.preSourceNextTransitionSignal_);
copySlotsToFromReverse(postSourceNextTransitionSignal_, iOther.postSourceNextTransitionSignal_);

copySlotsToFrom(preSourceLumiSignal_, iOther.preSourceLumiSignal_);
copySlotsToFromReverse(postSourceLumiSignal_, iOther.postSourceLumiSignal_);

Expand Down Expand Up @@ -409,6 +418,9 @@ namespace edm {
copySlotsToFrom(preEventSignal_, iOther.preEventSignal_);
copySlotsToFromReverse(postEventSignal_, iOther.postEventSignal_);

copySlotsToFrom(preClearEventSignal_, iOther.preClearEventSignal_);
copySlotsToFromReverse(postClearEventSignal_, iOther.postClearEventSignal_);

copySlotsToFrom(prePathEventSignal_, iOther.prePathEventSignal_);
copySlotsToFromReverse(postPathEventSignal_, iOther.postPathEventSignal_);

Expand Down

0 comments on commit 645964a

Please sign in to comment.