Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Alpaka ESProducers asynchronous for non-host backends #47034

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion HeterogeneousCore/AlpakaCore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ namespace cms::alpakatools {
};
}
```
Note that the destination (device-side) type `TDst` can be different from or the same as the source (host-side) type `TSrc` as far as the framework is concerned. For example, in the `PortableCollection` model the types are different. The `copyAsync()` member function is easiest to implement as a template over `TQueue`. The framework handles the necessary synchronization between the copy function and the consumer (currently the synchronization blocks, but work is ongoing to make it non-blocking).
Note that the destination (device-side) type `TDst` can be different from or the same as the source (host-side) type `TSrc` as far as the framework is concerned. For example, in the `PortableCollection` model the types are different. The `copyAsync()` member function is easiest to implement as a template over `TQueue`. The framework handles the necessary synchronization between the copy function and the consumer in a non-blocking way.

The `CopyToDevice` class template is partially specialized for all `PortableCollection` instantiations.

Expand Down
21 changes: 9 additions & 12 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/DeviceProductType.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,21 @@
#include "DataFormats/Common/interface/DeviceProduct.h"
#include "HeterogeneousCore/AlpakaInterface/interface/config.h"

#include <type_traits>

namespace ALPAKA_ACCELERATOR_NAMESPACE::detail {
// host synchronous backends can use TProduct directly
// all device and asynchronous backends need to be wrapped
inline constexpr bool useProductDirectly =
std::is_same_v<Platform, alpaka::PlatformCpu> and std::is_same_v<Queue, alpaka::QueueCpuBlocking>;

/**
* This "trait" class abstracts the actual product type put in the
* Type alias for the actual product type put in the
* edm::Event.
*/
template <typename TProduct>
struct DeviceProductType {
#ifdef ALPAKA_ACC_CPU_B_SEQ_T_SEQ_ENABLED
// host synchronous backends can use TProduct directly
using type = TProduct;
#else
// all device and asynchronous backends need to be wrapped
using type = edm::DeviceProduct<TProduct>;
#endif
};
using DeviceProductType = std::conditional_t<useProductDirectly, TProduct, edm::DeviceProduct<TProduct>>;

template <typename TProduct>
inline constexpr bool useProductDirectly = std::is_same_v<typename DeviceProductType<TProduct>::type, TProduct>;
} // namespace ALPAKA_ACCELERATOR_NAMESPACE::detail

#endif
2 changes: 1 addition & 1 deletion HeterogeneousCore/AlpakaCore/interface/alpaka/EDGetToken.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
*/
template <typename TProduct>
class EDGetToken {
using ProductType = typename detail::DeviceProductType<TProduct>::type;
using ProductType = detail::DeviceProductType<TProduct>;

public:
constexpr EDGetToken() = default;
Expand Down
2 changes: 1 addition & 1 deletion HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
*/
template <typename TProduct>
class EDPutToken {
using ProductType = typename detail::DeviceProductType<TProduct>::type;
using ProductType = detail::DeviceProductType<TProduct>;

public:
constexpr EDPutToken() noexcept = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,16 @@
#include <type_traits>

namespace ALPAKA_ACCELERATOR_NAMESPACE::detail {
// host backends can use TProduct directly
// all device backends need to be wrapped
inline constexpr bool useESProductDirectly = std::is_same_v<Platform, alpaka::PlatformCpu>;

/**
* This "trait" class abstracts the actual product type put in an
* Type alias for the actual product type put in an
* EventSetup record
*/
template <typename TProduct>
struct ESDeviceProductType {
using type = std::conditional_t<std::is_same_v<Platform, alpaka::PlatformCpu>,
// host backends can use TProduct directly
TProduct,
// all device backends need to be wrapped
ESDeviceProduct<TProduct>>;
};

template <typename TProduct>
inline constexpr bool useESProductDirectly = std::is_same_v<typename ESDeviceProductType<TProduct>::type, TProduct>;
using ESDeviceProductType = std::conditional_t<useESProductDirectly, TProduct, ESDeviceProduct<TProduct>>;
} // namespace ALPAKA_ACCELERATOR_NAMESPACE::detail

#endif
2 changes: 1 addition & 1 deletion HeterogeneousCore/AlpakaCore/interface/alpaka/ESGetToken.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {

auto const& underlyingToken() const { return token_; }

using ProductType = typename detail::ESDeviceProductType<ESProduct>::type;
using ProductType = detail::ESDeviceProductType<ESProduct>;
edm::ESGetToken<ProductType, ESRecord> token_;
};
} // namespace ALPAKA_ACCELERATOR_NAMESPACE::device
Expand Down
54 changes: 38 additions & 16 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef HeterogeneousCore_AlpakaCore_interface_alpaka_ESProducer_h
#define HeterogeneousCore_AlpakaCore_interface_alpaka_ESProducer_h

#include "FWCore/Framework/interface/ESProducer.h"
#include "FWCore/Framework/interface/ESProducerExternalWork.h"
#include "FWCore/Framework/interface/MakeDataException.h"
#include "FWCore/Framework/interface/produce_helpers.h"
#include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h"
Expand All @@ -24,8 +24,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
* the the usual Record argument. For producing a device product,
* the produce funtion should have device::Record<Record> argument.
*/
class ESProducer : public edm::ESProducer {
using Base = edm::ESProducer;
class ESProducer : public edm::ESProducerExternalWork {
using Base = edm::ESProducerExternalWork;

public:
static void prevalidate(edm::ConfigurationDescriptions& descriptions) {
Expand All @@ -45,7 +45,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
auto setWhatProduced(T* iThis, TReturn (T ::*iMethod)(TRecord const&), edm::es::Label const& label = {}) {
auto cc = Base::setWhatProduced(iThis, iMethod, label);
using TProduct = typename edm::eventsetup::produce::smart_pointer_traits<TReturn>::type;
if constexpr (not detail::useESProductDirectly<TProduct>) {
if constexpr (not detail::useESProductDirectly) {
// for device backends add the copy to device
auto tokenPtr = std::make_shared<edm::ESGetToken<TProduct, TRecord>>();
auto ccDev = setWhatProducedDevice<TRecord>(
Expand All @@ -69,12 +69,16 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
TReturn (T ::*iMethod)(device::Record<TRecord> const&),
edm::es::Label const& label = {}) {
using TProduct = typename edm::eventsetup::produce::smart_pointer_traits<TReturn>::type;
if constexpr (detail::useESProductDirectly<TProduct>) {
if constexpr (detail::useESProductDirectly) {
return Base::setWhatProduced(
[iThis, iMethod](TRecord const& record) {
auto const& devices = cms::alpakatools::devices<Platform>();
assert(devices.size() == 1);
device::Record<TRecord> const deviceRecord(record, devices.front());
static_assert(std::is_same_v<std::remove_cvref_t<decltype(deviceRecord.queue())>,
alpaka::Queue<Device, alpaka::Blocking>>,
"Non-blocking queue when trying to use ES data product directly. This might indicate a "
"need to extend the Alpaka ESProducer base class.");
return std::invoke(iMethod, iThis, deviceRecord);
},
label);
Expand All @@ -93,12 +97,12 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
using TProduct = typename edm::eventsetup::produce::smart_pointer_traits<TReturn>::type;
using ProductType = ESDeviceProduct<TProduct>;
using ReturnType = detail::ESDeviceProductWithStorage<TProduct, TReturn>;
return Base::setWhatProduced(
[function = std::forward<TFunc>(func)](TRecord const& record) -> std::unique_ptr<ProductType> {
return Base::setWhatAcquiredProducedWithLambda(
// acquire() part
[function = std::forward<TFunc>(func), synchronize = synchronize_](TRecord const& record,
edm::WaitingTaskWithArenaHolder holder) {
// TODO: move the multiple device support into EventSetup system itself
auto const& devices = cms::alpakatools::devices<Platform>();
std::vector<std::shared_ptr<Queue>> queues;
queues.reserve(devices.size());
auto ret = std::make_unique<ReturnType>(devices.size());
bool allnull = true;
bool anynull = false;
Expand All @@ -111,12 +115,28 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
} else {
anynull = true;
}
queues.push_back(deviceRecord.queuePtr());
}
// TODO: to be changed asynchronous later
for (auto& queuePtr : queues) {
alpaka::wait(*queuePtr);
if (synchronize) {
alpaka::wait(deviceRecord.queue());
} else {
enqueueCallback(deviceRecord.queue(), std::move(holder));
}
// The Queue is returned to the QueueCache. The same
// Queue may be used for other work before the work
// enqueued here finishes. The only impact would be a
// (slight?) delay in the completion of the other work.
// Given that the ESProducers are expected to be mostly
// for host-to-device data copies, that are serialized
// anyway (at least on current NVIDIA), this should be
// reasonable behavior for now.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively the Queues could be stored in an std::vector like before, that is then moved from the "acquire" lambda to be "produce" lambda.

Copy link
Contributor

@fwyzard fwyzard Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Queue is returned to the QueueCache. The same Queue may be used for other work before the work enqueued here finishes.

Isn't this already a common behaviour with queues in the QueueCache ?
Or, in the other cases, queues are returned only once all work has been completed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Queue is returned to the QueueCache. The same Queue may be used for other work before the work enqueued here finishes.

Isn't this already a common behaviour with queues in the QueueCache ? Or, in the other cases, queues are returned only once all work has been completed ?

In the Event system case the queues are returned only after all work has been completed. This happens because the shared_ptr<Queue> are held in the Event data products, and they return the queue only after synchronization. (although I'd say this behavior is more of a result of other design decisions than being a deliberate choice by itself)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. It's the EventCache that has additional provisions to avoid reusing events that are not yet "ready".

IMHO these current changes are fine, because Event Setup transitions are supposed to be "rare" with respect to the Event work.
If we later notice any performance impact here, we can always add back the vector, or extend the QueueCache to only return queues that are "empty".

}
return std::tuple(std::move(ret), allnull, anynull);
},
// produce() part, called after the asynchronous work in all queues have finished
[](TRecord const& record, auto fromAcquire) -> std::unique_ptr<ProductType> {
auto [ret, allnull, anynull] = std::move(fromAcquire);
// The 'allnull'/'anynull' actions are in produce()
// to keep any destination memory in 'ret'
// alive until the asynchronous work has finished
if (allnull) {
return nullptr;
} else if (anynull) {
Expand All @@ -128,17 +148,19 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
// make the EventSetup system itself aware of multiple
// devies (or memory spaces). I hope this exception
// would be good-enough until we get there.
ESProducer::throwSomeNullException();
throwSomeNullException();
}
return ret;
return std::move(ret);
},
label);
}

static void enqueueCallback(Queue& queue, edm::WaitingTaskWithArenaHolder holder);
static void throwSomeNullException();

std::string const moduleLabel_;
std::string const appendToDataLabel_;
bool const synchronize_;
};
} // namespace ALPAKA_ACCELERATOR_NAMESPACE

Expand Down
8 changes: 4 additions & 4 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
template <typename T>
T const& get(device::EDGetToken<T> const& token) const {
auto const& deviceProduct = constEvent_.get(token.underlyingToken());
if constexpr (detail::useProductDirectly<T>) {
if constexpr (detail::useProductDirectly) {
return deviceProduct;
} else {
// try to re-use queue from deviceProduct if our queue has not yet been used
Expand All @@ -90,7 +90,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
template <typename T>
edm::Handle<T> getHandle(device::EDGetToken<T> const& token) const {
auto deviceProductHandle = constEvent_.getHandle(token.underlyingToken());
if constexpr (detail::useProductDirectly<T>) {
if constexpr (detail::useProductDirectly) {
return deviceProductHandle;
} else {
if (not deviceProductHandle) {
Expand All @@ -114,7 +114,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
// The idea for Ref-like things in this domain differs from earlier Refs anyway
template <typename T, typename... Args>
void emplace(device::EDPutToken<T> const& token, Args&&... args) {
if constexpr (detail::useProductDirectly<T>) {
if constexpr (detail::useProductDirectly) {
event_->emplace(token.underlyingToken(), std::forward<Args>(args)...);
} else {
event_->emplace(token.underlyingToken(), metadata_, std::forward<Args>(args)...);
Expand All @@ -130,7 +130,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {

template <typename T>
void put(device::EDPutToken<T> const& token, std::unique_ptr<T> product) {
if constexpr (detail::useProductDirectly<T>) {
if constexpr (detail::useProductDirectly) {
event_->emplace(token.underlyingToken(), std::move(*product));
} else {
event_->emplace(token.underlyingToken(), metadata_, std::move(*product));
Expand Down
4 changes: 2 additions & 2 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/EventSetup.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
template <typename T, typename R>
T const& getData(device::ESGetToken<T, R> const& iToken) const {
auto const& product = setup_.getData(iToken.underlyingToken());
if constexpr (detail::useESProductDirectly<T>) {
if constexpr (detail::useESProductDirectly) {
return product;
} else {
return product.get(device_);
Expand All @@ -53,7 +53,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
template <typename T, typename R>
edm::ESHandle<T> getHandle(device::ESGetToken<T, R> const& iToken) const {
auto handle = setup_.getHandle(iToken.underlyingToken());
if constexpr (detail::useESProductDirectly<T>) {
if constexpr (detail::useESProductDirectly) {
return handle;
} else {
if (not handle) {
Expand Down
4 changes: 1 addition & 3 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "FWCore/Utilities/interface/Transition.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/DeviceProductType.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataAcquireSentry.h"
#include "HeterogeneousCore/AlpakaCore/interface/EventCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/QueueCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h"
#include "HeterogeneousCore/AlpakaInterface/interface/Backend.h"
#include "HeterogeneousCore/AlpakaInterface/interface/CopyToHost.h"
Expand Down Expand Up @@ -101,7 +99,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
// can think of it later if really needed
template <typename TProduct, typename TToken, edm::Transition Tr>
edm::EDPutTokenT<TToken> deviceProduces(std::string instanceName) {
if constexpr (detail::useProductDirectly<TProduct>) {
if constexpr (detail::useProductDirectly) {
return Base::template produces<TToken, Tr>(std::move(instanceName));
} else {
edm::EDPutTokenT<TToken> token = Base::template produces<TToken, Tr>(instanceName);
Expand Down
6 changes: 3 additions & 3 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/Record.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
template <typename TProduct, typename TDepRecord>
edm::ESHandle<TProduct> getHandle(device::ESGetToken<TProduct, TDepRecord> const& iToken) const {
auto handle = record_.getHandle(iToken.underlyingToken());
if constexpr (detail::useESProductDirectly<TProduct>) {
if constexpr (detail::useESProductDirectly) {
return handle;
} else {
if (not handle) {
Expand All @@ -67,7 +67,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
template <typename TProduct, typename TDepRecord>
edm::ESTransientHandle<TProduct> getTransientHandle(device::ESGetToken<TProduct, TDepRecord> const& iToken) const {
auto handle = record_.getTransientHandle(iToken.underlyingToken());
if constexpr (detail::useESProductDirectly<TProduct>) {
if constexpr (detail::useESProductDirectly) {
return handle;
} else {
if (not handle) {
Expand All @@ -90,7 +90,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
template <typename TProduct, typename TDepRecord>
TProduct const& get(device::ESGetToken<TProduct, TDepRecord> const& iToken) const {
auto const& product = record_.get(iToken.underlyingToken());
if constexpr (detail::useESProductDirectly<TProduct>) {
if constexpr (detail::useESProductDirectly) {
return product;
} else {
return product.get(alpaka::getDev(*queue_));
Expand Down
21 changes: 20 additions & 1 deletion HeterogeneousCore/AlpakaCore/src/alpaka/ESProducer.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
#include "FWCore/Concurrency/interface/Async.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/Utilities/interface/EDMException.h"
#include "HeterogeneousCore/AlpakaCore/interface/EventCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h"
#include "HeterogeneousCore/AlpakaInterface/interface/HostOnlyTask.h"

namespace ALPAKA_ACCELERATOR_NAMESPACE {
ESProducer::ESProducer(edm::ParameterSet const& iConfig)
: moduleLabel_(iConfig.getParameter<std::string>("@module_label")),
appendToDataLabel_(iConfig.getParameter<std::string>("appendToDataLabel")) {}
appendToDataLabel_(iConfig.getParameter<std::string>("appendToDataLabel")),
// The 'synchronize' parameter can be unset in Alpaka modules
// specified with the namespace prefix instead if '@alpaka'
// suffix
synchronize_(
iConfig.getUntrackedParameter<edm::ParameterSet>("alpaka").getUntrackedParameter("synchronize", false)) {}

void ESProducer::enqueueCallback(Queue& queue, edm::WaitingTaskWithArenaHolder holder) {
edm::Service<edm::Async> async;
auto event = cms::alpakatools::getEventCache<Event>().get(alpaka::getDev(queue));
alpaka::enqueue(queue, *event);
async->runAsync(
std::move(holder),
[event = std::move(event)]() mutable { alpaka::wait(*event); },
[]() { return "Enqueued via " EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE) "::ESProducer::enqueueCallback()"; });
}

void ESProducer::throwSomeNullException() {
throw edm::Exception(edm::errors::UnimplementedFeature)
Expand Down