Skip to content

Commit

Permalink
Fix: Make Services available to ES Modules
Browse files Browse the repository at this point in the history
Now the ServiceToken is used before calling the methods of an ES module.
  • Loading branch information
Dr15Jones authored and fwyzard committed Jul 15, 2020
1 parent 3f09c90 commit fc92bcd
Show file tree
Hide file tree
Showing 17 changed files with 103 additions and 76 deletions.
50 changes: 29 additions & 21 deletions FWCore/Framework/interface/Callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "FWCore/Utilities/interface/ConvertException.h"
#include "FWCore/Concurrency/interface/WaitingTaskList.h"
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"

namespace edm {
namespace eventsetup {
Expand Down Expand Up @@ -66,39 +68,40 @@ namespace edm {

void prefetchAsync(WaitingTask* iTask,
EventSetupRecordImpl const* iRecord,
EventSetupImpl const* iEventSetupImpl) {
EventSetupImpl const* iEventSetupImpl,
ServiceToken const& token) {
bool expected = false;
auto doPrefetch = wasCalledForThisRecord_.compare_exchange_strong(expected, true);
taskList_.add(iTask);
if (doPrefetch) {
if UNLIKELY (producer_->hasMayConsumes()) {
//after prefetching need to do the mayGet
auto mayGetTask = edm::make_waiting_task(
tbb::task::allocate_root(), [this, iRecord, iEventSetupImpl](std::exception_ptr const* iExcept) {
tbb::task::allocate_root(), [this, iRecord, iEventSetupImpl, token](std::exception_ptr const* iExcept) {
if (iExcept) {
runProducerAsync(iExcept, iRecord, iEventSetupImpl);
runProducerAsync(iExcept, iRecord, iEventSetupImpl, token);
return;
}
if (handleMayGet(iRecord, iEventSetupImpl)) {
auto runTask =
edm::make_waiting_task(tbb::task::allocate_root(),
[this, iRecord, iEventSetupImpl](std::exception_ptr const* iExcept) {
runProducerAsync(iExcept, iRecord, iEventSetupImpl);
});
prefetchNeededDataAsync(runTask, iEventSetupImpl, &((*postMayGetProxies_).front()));
auto runTask = edm::make_waiting_task(
tbb::task::allocate_root(),
[this, iRecord, iEventSetupImpl, token](std::exception_ptr const* iExcept) {
runProducerAsync(iExcept, iRecord, iEventSetupImpl, token);
});
prefetchNeededDataAsync(runTask, iEventSetupImpl, &((*postMayGetProxies_).front()), token);
} else {
runProducerAsync(iExcept, iRecord, iEventSetupImpl);
runProducerAsync(iExcept, iRecord, iEventSetupImpl, token);
}
});

//Get everything we can before knowing about the mayGets
prefetchNeededDataAsync(mayGetTask, iEventSetupImpl, getTokenIndices());
prefetchNeededDataAsync(mayGetTask, iEventSetupImpl, getTokenIndices(), token);
} else {
auto task = edm::make_waiting_task(tbb::task::allocate_root(),
[this, iRecord, iEventSetupImpl](std::exception_ptr const* iExcept) {
runProducerAsync(iExcept, iRecord, iEventSetupImpl);
});
prefetchNeededDataAsync(task, iEventSetupImpl, getTokenIndices());
auto task = edm::make_waiting_task(
tbb::task::allocate_root(), [this, iRecord, iEventSetupImpl, token](std::exception_ptr const* iExcept) {
runProducerAsync(iExcept, iRecord, iEventSetupImpl, token);
});
prefetchNeededDataAsync(task, iEventSetupImpl, getTokenIndices(), token);
}
}
}
Expand Down Expand Up @@ -132,14 +135,17 @@ namespace edm {
ESProxyIndex const* getTokenIndices() const { return producer_->getTokenIndices(id_); }

private:
void prefetchNeededDataAsync(WaitingTask* task, EventSetupImpl const* iImpl, ESProxyIndex const* proxies) const {
void prefetchNeededDataAsync(WaitingTask* task,
EventSetupImpl const* iImpl,
ESProxyIndex const* proxies,
edm::ServiceToken const& token) const {
WaitingTaskHolder h(task);
auto recs = producer_->getTokenRecordIndices(id_);
auto n = producer_->numberOfTokenIndices(id_);
for (size_t i = 0; i != n; ++i) {
auto rec = iImpl->findImpl(recs[i]);
if (rec) {
rec->prefetchAsync(task, proxies[i], iImpl);
rec->prefetchAsync(task, proxies[i], iImpl, token);
}
}
}
Expand All @@ -154,22 +160,24 @@ namespace edm {

void runProducerAsync(std::exception_ptr const* iExcept,
EventSetupRecordImpl const* iRecord,
EventSetupImpl const* iEventSetupImpl) {
EventSetupImpl const* iEventSetupImpl,
ServiceToken const& token) {
if (iExcept) {
//The cache held by the CallbackProxy was already set to invalid at the beginning of the IOV
taskList_.doneWaiting(*iExcept);
return;
}
producer_->queue().push([this, iRecord, iEventSetupImpl]() {
producer_->queue().push([this, iRecord, iEventSetupImpl, token]() {
std::exception_ptr exceptPtr;
try {
convertException::wrap([this, iRecord, iEventSetupImpl] {
convertException::wrap([this, iRecord, iEventSetupImpl, token] {
auto proxies = getTokenIndices();
if (postMayGetProxies_) {
proxies = &((*postMayGetProxies_).front());
}
TRecord rec;
rec.setImpl(iRecord, transitionID(), proxies, iEventSetupImpl, true);
ServiceRegistry::Operate operate(token);
decorator_.pre(rec);
storeReturnedValues((producer_->*method_)(rec));
decorator_.post(rec);
Expand Down
5 changes: 3 additions & 2 deletions FWCore/Framework/interface/CallbackProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ namespace edm::eventsetup {
void prefetchAsyncImpl(WaitingTask* iWaitTask,
const EventSetupRecordImpl& iRecord,
const DataKey&,
EventSetupImpl const* iEventSetupImpl) final {
EventSetupImpl const* iEventSetupImpl,
ServiceToken const& iToken) final {
assert(iRecord.key() == RecordT::keyForClass());
callback_->prefetchAsync(iWaitTask, &iRecord, iEventSetupImpl);
callback_->prefetchAsync(iWaitTask, &iRecord, iEventSetupImpl, iToken);
}

void const* getAfterPrefetchImpl() const final { return smart_pointer_traits::getPointer(data_); }
Expand Down
7 changes: 5 additions & 2 deletions FWCore/Framework/interface/DataProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace edm {
class ActivityRegistry;
class EventSetupImpl;
class WaitingTask;
class ServiceToken;

namespace eventsetup {
struct ComponentDescription;
Expand All @@ -47,7 +48,8 @@ namespace edm {
// ---------- const member functions ---------------------
bool cacheIsValid() const { return cacheIsValid_.load(std::memory_order_acquire); }

void prefetchAsync(WaitingTask*, EventSetupRecordImpl const&, DataKey const&, EventSetupImpl const*) const;
void prefetchAsync(
WaitingTask*, EventSetupRecordImpl const&, DataKey const&, EventSetupImpl const*, ServiceToken const&) const;

void doGet(EventSetupRecordImpl const&,
DataKey const&,
Expand Down Expand Up @@ -86,7 +88,8 @@ namespace edm {
virtual void prefetchAsyncImpl(WaitingTask*,
EventSetupRecordImpl const&,
DataKey const& iKey,
EventSetupImpl const*) = 0;
EventSetupImpl const*,
ServiceToken const&) = 0;

/** indicates that the Proxy should invalidate any cached information
as that information has 'expired' (i.e. we have moved to a new IOV)
Expand Down
7 changes: 5 additions & 2 deletions FWCore/Framework/interface/DataProxyTemplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
// user include files
#include "FWCore/Framework/interface/DataProxy.h"
#include "FWCore/Framework/interface/EventSetupRecord.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/Concurrency/interface/WaitingTaskList.h"
#include <cassert>
#include <limits>
Expand All @@ -57,18 +58,20 @@ namespace edm {
void prefetchAsyncImpl(WaitingTask* iTask,
const EventSetupRecordImpl& iRecord,
const DataKey& iKey,
EventSetupImpl const* iEventSetupImpl) override {
EventSetupImpl const* iEventSetupImpl,
edm::ServiceToken const& iToken) override {
assert(iRecord.key() == RecordT::keyForClass());
bool expected = false;
bool doPrefetch = prefetching_.compare_exchange_strong(expected, true);
taskList_.add(iTask);

if (doPrefetch) {
tbb::task::spawn(*edm::make_waiting_task(
tbb::task::allocate_root(), [this, &iRecord, iKey, iEventSetupImpl](std::exception_ptr const*) {
tbb::task::allocate_root(), [this, &iRecord, iKey, iEventSetupImpl, iToken](std::exception_ptr const*) {
try {
RecordT rec;
rec.setImpl(&iRecord, std::numeric_limits<unsigned int>::max(), nullptr, iEventSetupImpl, true);
ServiceRegistry::Operate operate(iToken);
this->make(rec, iKey);
} catch (...) {
this->taskList_.doneWaiting(std::current_exception());
Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/interface/ESSourceDataProxyBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ namespace edm::eventsetup {
void prefetchAsyncImpl(edm::WaitingTask* iTask,
edm::eventsetup::EventSetupRecordImpl const&,
edm::eventsetup::DataKey const& iKey,
edm::EventSetupImpl const*) final;
edm::EventSetupImpl const*,
edm::ServiceToken const&) final;

// ---------- member data --------------------------------

Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/interface/EventSetupRecordImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ namespace edm {
class ESInputTag;
class EventSetupImpl;
class WaitingTask;
class ServiceToken;

namespace eventsetup {
struct ComponentDescription;
Expand All @@ -89,7 +90,7 @@ namespace edm {
bool doGet(ESProxyIndex iProxyIndex, EventSetupImpl const*, bool aGetTransiently = false) const;

///prefetch the data to setup for subsequent calls to getImplementation
void prefetchAsync(WaitingTask* iTask, ESProxyIndex iProxyIndex, EventSetupImpl const*) const;
void prefetchAsync(WaitingTask* iTask, ESProxyIndex iProxyIndex, EventSetupImpl const*, ServiceToken const&) const;

/**returns true only if someone has already requested data for this key
and the data was retrieved
Expand Down
11 changes: 7 additions & 4 deletions FWCore/Framework/src/DataProxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "FWCore/Framework/interface/MakeDataException.h"
#include "FWCore/Framework/interface/EventSetupRecord.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/Concurrency/interface/WaitingTaskList.h"

#include "FWCore/Framework/src/esTaskArenas.h"
Expand Down Expand Up @@ -63,8 +64,9 @@ namespace edm {
void DataProxy::prefetchAsync(WaitingTask* iTask,
EventSetupRecordImpl const& iRecord,
DataKey const& iKey,
EventSetupImpl const* iEventSetupImpl) const {
const_cast<DataProxy*>(this)->prefetchAsyncImpl(iTask, iRecord, iKey, iEventSetupImpl);
EventSetupImpl const* iEventSetupImpl,
ServiceToken const& iToken) const {
const_cast<DataProxy*>(this)->prefetchAsyncImpl(iTask, iRecord, iKey, iEventSetupImpl, iToken);
}

void const* DataProxy::getAfterPrefetch(const EventSetupRecordImpl& iRecord,
Expand Down Expand Up @@ -98,8 +100,9 @@ namespace edm {
auto waitTask = edm::make_empty_waiting_task();
waitTask->set_ref_count(2);
auto waitTaskPtr = waitTask.get();
edm::esTaskArena().execute([this, waitTaskPtr, &iRecord, &iKey, iEventSetupImpl]() {
prefetchAsync(waitTaskPtr, iRecord, iKey, iEventSetupImpl);
auto token = ServiceRegistry::instance().presentToken();
edm::esTaskArena().execute([this, waitTaskPtr, &iRecord, &iKey, iEventSetupImpl, token]() {
prefetchAsync(waitTaskPtr, iRecord, iKey, iEventSetupImpl, token);
waitTaskPtr->decrement_ref_count();
waitTaskPtr->wait_for_all();
});
Expand Down
3 changes: 2 additions & 1 deletion FWCore/Framework/src/ESSourceDataProxyBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
void edm::eventsetup::ESSourceDataProxyBase::prefetchAsyncImpl(edm::WaitingTask* iTask,
edm::eventsetup::EventSetupRecordImpl const& iRecord,
edm::eventsetup::DataKey const& iKey,
edm::EventSetupImpl const*) {
edm::EventSetupImpl const*,
edm::ServiceToken const&) {
bool expected = false;
auto doPrefetch = m_prefetching.compare_exchange_strong(expected, true);
m_waitingList.add(iTask);
Expand Down
5 changes: 3 additions & 2 deletions FWCore/Framework/src/EventSetupRecordImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,16 @@ namespace edm {

void EventSetupRecordImpl::prefetchAsync(WaitingTask* iTask,
ESProxyIndex iProxyIndex,
EventSetupImpl const* iEventSetupImpl) const {
EventSetupImpl const* iEventSetupImpl,
ServiceToken const& iToken) const {
if UNLIKELY (iProxyIndex.value() == std::numeric_limits<int>::max()) {
return;
}

const DataProxy* proxy = proxies_[iProxyIndex.value()];
if (nullptr != proxy) {
auto const& key = keysForProxies_[iProxyIndex.value()];
proxy->prefetchAsync(iTask, *this, key, iEventSetupImpl);
proxy->prefetchAsync(iTask, *this, key, iEventSetupImpl, iToken);
}
}

Expand Down
65 changes: 34 additions & 31 deletions FWCore/Framework/src/Worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ namespace edm {
//Need to be sure the ref count isn't set to 0 immediately
iTask->increment_ref_count();

esPrefetchAsync(iTask, iEventSetup, iTransition);
esPrefetchAsync(iTask, iEventSetup, iTransition, token);
for (auto const& item : items) {
ProductResolverIndex productResolverIndex = item.productResolverIndex();
bool skipCurrentProcess = item.skipCurrentProcess();
Expand Down Expand Up @@ -279,7 +279,10 @@ namespace edm {
choiceHolder.doneWaiting(std::exception_ptr{});
}

void Worker::esPrefetchAsync(WaitingTask* iTask, EventSetupImpl const& iImpl, Transition iTrans) {
void Worker::esPrefetchAsync(WaitingTask* iTask,
EventSetupImpl const& iImpl,
Transition iTrans,
edm::ServiceToken const& iToken) {
auto const& recs = esRecordsToGetFrom(iTrans);
auto const& items = esItemsToGetFrom(iTrans);

Expand All @@ -294,34 +297,34 @@ namespace edm {
// will work.
if UNLIKELY (tbb::this_task_arena::max_concurrency() == 1) {
//We spawn this first so that the other ES tasks are before it in the TBB queue
tbb::task::spawn(*make_functor_task(tbb::task::allocate_root(),
[this, task = edm::WaitingTaskHolder(iTask), iTrans, &iImpl]() mutable {
auto waitTask = edm::make_empty_waiting_task();
waitTask->set_ref_count(2);
auto waitTaskPtr = waitTask.get();
esTaskArena().execute([waitTaskPtr, this, iTrans, &iImpl]() {
auto const& recs = esRecordsToGetFrom(iTrans);
auto const& items = esItemsToGetFrom(iTrans);
waitTaskPtr->set_ref_count(2);
for (size_t i = 0; i != items.size(); ++i) {
if (recs[i] != ESRecordIndex{}) {
auto rec = iImpl.findImpl(recs[i]);
if (rec) {
rec->prefetchAsync(waitTaskPtr, items[i], &iImpl);
}
}
}
waitTaskPtr->decrement_ref_count();
waitTaskPtr->wait_for_all();
});

auto exPtr = waitTask->exceptionPtr();
if (exPtr) {
task.doneWaiting(*exPtr);
} else {
task.doneWaiting(std::exception_ptr{});
}
}));
tbb::task::spawn(*make_functor_task(
tbb::task::allocate_root(), [this, task = edm::WaitingTaskHolder(iTask), iTrans, &iImpl, iToken]() mutable {
auto waitTask = edm::make_empty_waiting_task();
waitTask->set_ref_count(2);
auto waitTaskPtr = waitTask.get();
esTaskArena().execute([waitTaskPtr, this, iTrans, &iImpl, iToken]() {
auto const& recs = esRecordsToGetFrom(iTrans);
auto const& items = esItemsToGetFrom(iTrans);
waitTaskPtr->set_ref_count(2);
for (size_t i = 0; i != items.size(); ++i) {
if (recs[i] != ESRecordIndex{}) {
auto rec = iImpl.findImpl(recs[i]);
if (rec) {
rec->prefetchAsync(waitTaskPtr, items[i], &iImpl, iToken);
}
}
}
waitTaskPtr->decrement_ref_count();
waitTaskPtr->wait_for_all();
});

auto exPtr = waitTask->exceptionPtr();
if (exPtr) {
task.doneWaiting(*exPtr);
} else {
task.doneWaiting(std::exception_ptr{});
}
}));
} else {
//We need iTask to run in the default arena since it is not an ES task
auto task =
Expand All @@ -340,7 +343,7 @@ namespace edm {
if (recs[i] != ESRecordIndex{}) {
auto rec = iImpl.findImpl(recs[i]);
if (rec) {
rec->prefetchAsync(task, items[i], &iImpl);
rec->prefetchAsync(task, items[i], &iImpl, iToken);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Framework/src/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ namespace edm {
edm::Transition);

bool needsESPrefetching(Transition iTrans) const noexcept { return not esItemsToGetFrom(iTrans).empty(); }
void esPrefetchAsync(WaitingTask* iHolder, EventSetupImpl const&, Transition);
void esPrefetchAsync(WaitingTask* iHolder, EventSetupImpl const&, Transition, ServiceToken const&);

void emitPostModuleEventPrefetchingSignal() {
actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
Expand Down Expand Up @@ -987,7 +987,7 @@ namespace edm {
}
}
});
esPrefetchAsync(afterPrefetch, es, T::transition_);
esPrefetchAsync(afterPrefetch, es, T::transition_, serviceToken);
} else {
if (auto queue = this->serializeRunModule()) {
queue.push(toDo);
Expand Down
Loading

0 comments on commit fc92bcd

Please sign in to comment.