Skip to content

Commit

Permalink
Add StreamID parameter to Transformer callback functions
Browse files Browse the repository at this point in the history
The StreamID will be needed to implement implicit host-to-device
copies for Alpaka EDProducers
  • Loading branch information
makortel committed Jan 3, 2025
1 parent 9fb7223 commit 0e28aeb
Show file tree
Hide file tree
Showing 17 changed files with 88 additions and 72 deletions.
6 changes: 4 additions & 2 deletions FWCore/Framework/interface/TransformerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "FWCore/Utilities/interface/EDPutToken.h"
#include "FWCore/Utilities/interface/SoATuple.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "FWCore/Utilities/interface/TypeID.h"
#include "FWCore/Utilities/interface/ProductResolverIndex.h"

Expand Down Expand Up @@ -38,8 +39,9 @@ namespace edm {
protected:
//The function takes the WrapperBase corresponding to the data product from the EDPutToken
// and returns the WrapperBase associated to the id and instanceName
using TransformFunction = std::function<std::unique_ptr<edm::WrapperBase>(std::any)>;
using PreTransformFunction = std::function<std::any(edm::WrapperBase const&, edm::WaitingTaskWithArenaHolder)>;
using TransformFunction = std::function<std::unique_ptr<edm::WrapperBase>(edm::StreamID, std::any)>;
using PreTransformFunction =
std::function<std::any(edm::StreamID, edm::WrapperBase const&, edm::WaitingTaskWithArenaHolder)>;

void registerTransformImp(ProducerBase&, EDPutToken, const TypeID& id, std::string instanceName, TransformFunction);
void registerTransformAsyncImp(
Expand Down
20 changes: 11 additions & 9 deletions FWCore/Framework/interface/global/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,17 +474,17 @@ namespace edm {

template <typename G, typename F>
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](std::any const& iGotProduct) {
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

Expand All @@ -493,20 +493,22 @@ namespace edm {
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
using CacheTypeT =
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
[p = std::move(iPre)](
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{},
f(std::any_cast<CacheTypeT>(iCache)));
f(id, std::any_cast<CacheTypeT>(iCache)));
});
}

Expand Down
19 changes: 10 additions & 9 deletions FWCore/Framework/interface/limited/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,17 +462,17 @@ namespace edm {

template <typename G, typename F>
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](std::any const& iGotProduct) {
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

Expand All @@ -481,20 +481,21 @@ namespace edm {
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
using CacheTypeT = decltype(iPre(std::declval<StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<StreamID>(), std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
[p = std::move(iPre)](
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
auto cache = std::any_cast<CacheTypeT>(iCache);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
});
}

Expand Down
35 changes: 19 additions & 16 deletions FWCore/Framework/interface/one/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,38 +354,41 @@ namespace edm {

template <typename G, typename F>
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](edm::WrapperBase const& iGotProduct) {
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{},
f(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product()));
});
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

template <typename G, typename P, typename F>
void registerTransformAsync(edm::EDPutTokenT<G> iToken,
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
using CacheTypeT =
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
[p = std::move(iPre)](
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
auto cache = std::any_cast<CacheTypeT>(iCache);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
});
}

Expand Down
20 changes: 11 additions & 9 deletions FWCore/Framework/interface/stream/implementors.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,17 @@ namespace edm {

template <typename G, typename F>
void registerTransform(edm::EDPutTokenT<G> iToken, F iF, std::string productInstance = std::string()) {
using ReturnTypeT = decltype(iF(std::declval<G>()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<G>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[f = std::move(iF)](std::any const& iGotProduct) {
[f = std::move(iF)](edm::StreamID id, std::any const& iGotProduct) {
auto pGotProduct = std::any_cast<edm::WrapperBase const*>(iGotProduct);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(
WrapperBase::Emplace{}, f(*static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
WrapperBase::Emplace{}, f(id, *static_cast<edm::Wrapper<G> const*>(pGotProduct)->product()));
});
}

Expand All @@ -347,20 +347,22 @@ namespace edm {
P iPre,
F iF,
std::string productInstance = std::string()) {
using CacheTypeT = decltype(iPre(std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<CacheTypeT>()));
using CacheTypeT =
decltype(iPre(std::declval<edm::StreamID>(), std::declval<G>(), WaitingTaskWithArenaHolder()));
using ReturnTypeT = decltype(iF(std::declval<edm::StreamID>(), std::declval<CacheTypeT>()));
TypeID returnType(typeid(ReturnTypeT));
TransformerBase::registerTransformAsyncImp(
*this,
EDPutToken(iToken),
returnType,
std::move(productInstance),
[p = std::move(iPre)](edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(*static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
[p = std::move(iPre)](
edm::StreamID id, edm::WrapperBase const& iGotProduct, WaitingTaskWithArenaHolder iHolder) {
return std::any(p(id, *static_cast<edm::Wrapper<G> const&>(iGotProduct).product(), std::move(iHolder)));
},
[f = std::move(iF)](std::any const& iCache) {
[f = std::move(iF)](edm::StreamID id, std::any const& iCache) {
auto cache = std::any_cast<CacheTypeT>(iCache);
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(cache));
return std::make_unique<edm::Wrapper<ReturnTypeT>>(WrapperBase::Emplace{}, f(id, cache));
});
}

Expand Down
17 changes: 11 additions & 6 deletions FWCore/Framework/src/TransformerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
#include "FWCore/ServiceRegistry/interface/StreamContext.h"

#include <optional>

namespace {
Expand Down Expand Up @@ -116,7 +118,8 @@ namespace edm {
std::optional<decltype(iEvent.get(transformInfo_.get<kType>(iIndex), transformInfo_.get<kResolverIndex>(iIndex)))>
handle;
//transform acquiring signal
TransformAcquiringSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
auto const& streamContext = *mcc.getStreamContext();
TransformAcquiringSignalSentry sentry(iAct, streamContext, mcc);
CMS_SA_ALLOW try {
handle = iEvent.get(transformInfo_.get<kType>(iIndex), transformInfo_.get<kResolverIndex>(iIndex));
} catch (...) {
Expand All @@ -133,15 +136,16 @@ namespace edm {
} else {
//transform signal
auto mcc = iEvent.moduleCallingContext();
TransformSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
auto const& streamContext = *mcc.getStreamContext();
TransformSignalSentry sentry(iAct, streamContext, mcc);
iEvent.put(iBase.putTokenIndexToProductResolverIndex()[transformInfo_.get<kToken>(iIndex).index()],
transformInfo_.get<kTransform>(iIndex)(std::move(*cache)),
transformInfo_.get<kTransform>(iIndex)(streamContext.streamID(), std::move(*cache)),
handle);
}
});
WaitingTaskWithArenaHolder wta(*iHolder.group(), nextTask);
CMS_SA_ALLOW try {
*cache = transformInfo_.get<kPreTransform>(iIndex)(*(handle->wrapper()), wta);
*cache = transformInfo_.get<kPreTransform>(iIndex)(streamContext.streamID(), *(handle->wrapper()), wta);
} catch (...) {
wta.doneWaiting(std::current_exception());
}
Expand All @@ -153,9 +157,10 @@ namespace edm {
if (handle.wrapper()) {
std::any v = handle.wrapper();
//transform signal
TransformSignalSentry sentry(iAct, *mcc.getStreamContext(), mcc);
auto const& streamContext = *mcc.getStreamContext();
TransformSignalSentry sentry(iAct, streamContext, mcc);
iEvent.put(iBase.putTokenIndexToProductResolverIndex()[transformInfo_.get<kToken>(iIndex).index()],
transformInfo_.get<kTransform>(iIndex)(std::move(v)),
transformInfo_.get<kTransform>(iIndex)(streamContext.streamID(), std::move(v)),
handle);
}
} catch (...) {
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/test/global_filter_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class testGlobalFilter : public CppUnit::TestFixture {
public:
TransformProd(edm::ParameterSet const&) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand All @@ -380,8 +380,8 @@ class testGlobalFilter : public CppUnit::TestFixture {
token_ = produces<float>();
registerTransformAsync(
token_,
[](float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](IntHolder iWaitValue) { return iWaitValue.value_; });
[](edm::StreamID, float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](edm::StreamID, IntHolder iWaitValue) { return iWaitValue.value_; });
}

bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/test/global_producer_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ class testGlobalProducer : public CppUnit::TestFixture {
public:
TransformProd(edm::ParameterSet const&) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand All @@ -347,8 +347,8 @@ class testGlobalProducer : public CppUnit::TestFixture {
token_ = produces<float>();
registerTransformAsync(
token_,
[](float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](IntHolder iWaitValue) { return iWaitValue.value_; });
[](edm::StreamID, float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](edm::StreamID, IntHolder iWaitValue) { return iWaitValue.value_; });
}

void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/test/limited_filter_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class testLimitedFilter : public CppUnit::TestFixture {
TransformProd(edm::ParameterSet const&)
: edm::limited::EDFilterBase(s_pset), edm::limited::EDFilter<edm::Transformer>(s_pset) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

bool filter(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/test/limited_producer_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class testLimitedProducer : public CppUnit::TestFixture {
TransformProd(edm::ParameterSet const&)
: edm::limited::EDProducerBase(s_pset), edm::limited::EDProducer<edm::Transformer>(s_pset) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

void produce(edm::StreamID, edm::Event& iEvent, edm::EventSetup const&) const {
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/test/stream_producer_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ class testStreamProducer : public CppUnit::TestFixture {
public:
TransformProd(edm::ParameterSet const&) {
token_ = produces<float>();
registerTransform(token_, [](float iV) { return int(iV); });
registerTransform(token_, [](edm::StreamID, float iV) { return int(iV); });
}

void produce(edm::Event& iEvent, edm::EventSetup const&) final {
Expand All @@ -403,8 +403,8 @@ class testStreamProducer : public CppUnit::TestFixture {
token_ = produces<float>();
registerTransformAsync(
token_,
[](float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](IntHolder iWaitValue) { return iWaitValue.value_; });
[](edm::StreamID, float iV, edm::WaitingTaskWithArenaHolder iHolder) { return IntHolder(iV); },
[](edm::StreamID, IntHolder iWaitValue) { return iWaitValue.value_; });
}

void produce(edm::Event& iEvent, edm::EventSetup const&) final {
Expand Down
Loading

0 comments on commit 0e28aeb

Please sign in to comment.