Skip to content

Commit

Permalink
Extend global::OutputModule to support ExternalWork
Browse files Browse the repository at this point in the history
  • Loading branch information
Dr15Jones committed Jul 21, 2021
1 parent 968e15f commit 9199604
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 5 deletions.
7 changes: 6 additions & 1 deletion FWCore/Framework/interface/global/OutputModuleBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ namespace edm {
void doEndStream(StreamID id) { doEndStream_(id); }

bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
void doAcquire(EventTransitionInfo const&,
ActivityRegistry*,
ModuleCallingContext const*,
WaitingTaskWithArenaHolder&);
//For now this is a placeholder
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
ModuleCallingContext const& iModuleCallingContext,
Expand Down Expand Up @@ -264,10 +268,11 @@ namespace edm {
virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
virtual void doRespondToOpenInputFile_(FileBlock const&) {}
virtual void doRespondToCloseInputFile_(FileBlock const&) {}
virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder&) {}

virtual void setProcessesWithSelectedMergeableRunProducts(std::set<std::string> const&) {}

bool hasAcquire() const { return false; }
virtual bool hasAcquire() const { return false; }
bool hasAccumulator() const { return false; }

void keepThisBranch(BranchDescription const& desc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,24 @@ namespace edm {
std::unique_ptr<std::shared_ptr<C>[]> caches_;
};

template <typename T>
class ExternalWork : public virtual T {
public:
ExternalWork(edm::ParameterSet const& iPSet) : OutputModuleBase(iPSet) {}
ExternalWork(ExternalWork const&) = delete;
ExternalWork& operator=(ExternalWork const&) = delete;
~ExternalWork() noexcept(false) override{};

private:
bool hasAcquire() const override { return true; }

void doAcquire_(StreamID id, EventForOutput const& event, WaitingTaskWithArenaHolder& holder) final {
acquire(id, event, holder);
}

virtual void acquire(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder) const = 0;
};

template <typename T>
struct AbilityToImplementor;

Expand All @@ -153,6 +171,11 @@ namespace edm {
typedef edm::global::outputmodule::StreamCacheHolder<edm::global::OutputModuleBase, C> Type;
};

template <>
struct AbilityToImplementor<edm::ExternalWork> {
typedef edm::global::outputmodule::ExternalWork<edm::global::OutputModuleBase> Type;
};

} // namespace outputmodule
} // namespace global
} // namespace edm
Expand Down
22 changes: 18 additions & 4 deletions FWCore/Framework/src/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ namespace edm {
if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
//We need to run the selection in a different task so that
// we can prefetch the data needed for the selection
auto runTask =
WaitingTask* moduleTask =
new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());

//make sure the task is either run or destroyed
Expand All @@ -973,14 +973,28 @@ namespace edm {
private:
std::atomic<edm::WaitingTask*> m_task;
};
if constexpr (T::isEvent_) {
if (hasAcquire()) {
auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
ServiceWeakToken weakToken = token;
auto* group = task.group();
moduleTask = make_waiting_task(
[this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
WaitingTaskWithArenaHolder runTaskHolder(
*group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
t.execute();
});
}
}
auto* group = task.group();
auto ownRunTask = std::make_shared<DestroyTask>(runTask);
auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
ServiceWeakToken weakToken = token;
auto selectionTask =
make_waiting_task([ownRunTask, parentContext, info = transitionInfo, weakToken, group, this](
make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
std::exception_ptr const*) mutable {
ServiceRegistry::Operate guard(weakToken.lock());
prefetchAsync<T>(WaitingTaskHolder(*group, ownRunTask->release()),
prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
weakToken.lock(),
parentContext,
info,
Expand Down
7 changes: 7 additions & 0 deletions FWCore/Framework/src/WorkerT.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ namespace edm {
module_->doAcquire(info, activityRegistry(), mcc, holder);
}

template <>
inline void WorkerT<global::OutputModuleBase>::implDoAcquire(EventTransitionInfo const& info,
ModuleCallingContext const* mcc,
WaitingTaskWithArenaHolder& holder) {
module_->doAcquire(info, activityRegistry(), mcc, holder);
}

template <>
inline void WorkerT<stream::EDProducerAdaptorBase>::implDoAcquire(EventTransitionInfo const& info,
ModuleCallingContext const* mcc,
Expand Down
11 changes: 11 additions & 0 deletions FWCore/Framework/src/global/OutputModuleBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "FWCore/Framework/interface/TriggerNamesService.h"
#include "FWCore/Framework/src/EventSignalsSentry.h"
#include "FWCore/Framework/src/PreallocationConfiguration.h"
#include "FWCore/Framework/src/EventAcquireSignalsSentry.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
Expand Down Expand Up @@ -276,6 +277,16 @@ namespace edm {
return true;
}

void OutputModuleBase::doAcquire(EventTransitionInfo const& info,
ActivityRegistry* act,
ModuleCallingContext const* mcc,
WaitingTaskWithArenaHolder& holder) {
EventForOutput e(info, moduleDescription_, mcc);
e.setConsumer(this);
EventAcquireSignalsSentry sentry(act, mcc);
this->doAcquire_(e.streamID(), e, holder);
}

bool OutputModuleBase::doBeginRun(RunTransitionInfo const& info, ModuleCallingContext const* mcc) {
RunForOutput r(info, moduleDescription_, mcc, false);
r.setConsumer(this);
Expand Down

0 comments on commit 9199604

Please sign in to comment.