Skip to content

Commit

Permalink
Make Alpaka ESProducers asynchronous for non-host backends
Browse files Browse the repository at this point in the history
  • Loading branch information
makortel committed Jan 6, 2025
1 parent c73e2f4 commit 24181b3
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 19 deletions.
2 changes: 1 addition & 1 deletion HeterogeneousCore/AlpakaCore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ namespace cms::alpakatools {
};
}
```
Note that the destination (device-side) type `TDst` can be different from or the same as the source (host-side) type `TSrc` as far as the framework is concerned. For example, in the `PortableCollection` model the types are different. The `copyAsync()` member function is easiest to implement as a template over `TQueue`. The framework handles the necessary synchronization between the copy function and the consumer (currently the synchronization blocks, but work is ongoing to make it non-blocking).
Note that the destination (device-side) type `TDst` can be different from or the same as the source (host-side) type `TSrc` as far as the framework is concerned. For example, in the `PortableCollection` model the types are different. The `copyAsync()` member function is easiest to implement as a template over `TQueue`. The framework handles the necessary synchronization between the copy function and the consumer in a non-blocking way.

The `CopyToDevice` class template is partially specialized for all `PortableCollection` instantiations.

Expand Down
51 changes: 36 additions & 15 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef HeterogeneousCore_AlpakaCore_interface_alpaka_ESProducer_h
#define HeterogeneousCore_AlpakaCore_interface_alpaka_ESProducer_h

#include "FWCore/Framework/interface/ESProducer.h"
#include "FWCore/Framework/interface/ESProducerExternalWork.h"
#include "FWCore/Framework/interface/MakeDataException.h"
#include "FWCore/Framework/interface/produce_helpers.h"
#include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h"
Expand All @@ -12,7 +12,6 @@
#include "HeterogeneousCore/AlpakaInterface/interface/CopyToDevice.h"

#include <functional>
#include <type_traits>

namespace ALPAKA_ACCELERATOR_NAMESPACE {
/**
Expand All @@ -25,8 +24,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
* the the usual Record argument. For producing a device product,
* the produce funtion should have device::Record<Record> argument.
*/
class ESProducer : public edm::ESProducer {
using Base = edm::ESProducer;
class ESProducer : public edm::ESProducerExternalWork {
using Base = edm::ESProducerExternalWork;

public:
static void prevalidate(edm::ConfigurationDescriptions& descriptions) {
Expand Down Expand Up @@ -76,6 +75,10 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
auto const& devices = cms::alpakatools::devices<Platform>();
assert(devices.size() == 1);
device::Record<TRecord> const deviceRecord(record, devices.front());
static_assert(std::is_same_v<std::remove_cvref_t<decltype(deviceRecord.queue())>,
alpaka::Queue<Device, alpaka::Blocking>>,
"Non-blocking queue when trying to use ES data product directly. This might indicate a "
"need to extend the Alpaka ESProducer base class.");
return std::invoke(iMethod, iThis, deviceRecord);
},
label);
Expand All @@ -94,12 +97,12 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
using TProduct = typename edm::eventsetup::produce::smart_pointer_traits<TReturn>::type;
using ProductType = ESDeviceProduct<TProduct>;
using ReturnType = detail::ESDeviceProductWithStorage<TProduct, TReturn>;
return Base::setWhatProduced(
[function = std::forward<TFunc>(func)](TRecord const& record) -> std::unique_ptr<ProductType> {
return Base::setWhatAcquiredProducedWithLambda(
// acquire() part
[function = std::forward<TFunc>(func), synchronize = synchronize_](TRecord const& record,
edm::WaitingTaskWithArenaHolder holder) {
// TODO: move the multiple device support into EventSetup system itself
auto const& devices = cms::alpakatools::devices<Platform>();
std::vector<std::shared_ptr<Queue>> queues;
queues.reserve(devices.size());
auto ret = std::make_unique<ReturnType>(devices.size());
bool allnull = true;
bool anynull = false;
Expand All @@ -112,12 +115,28 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
} else {
anynull = true;
}
queues.push_back(deviceRecord.queuePtr());
}
// TODO: to be changed asynchronous later
for (auto& queuePtr : queues) {
alpaka::wait(*queuePtr);
if (synchronize) {
alpaka::wait(deviceRecord.queue());
} else {
enqueueCallback(deviceRecord.queue(), std::move(holder));
}
// The Queue is returned to the QueueCache. The same
// Queue may be used for other work before the work
// enqueued here finishes. The only impact would be a
// (slight?) delay in the completion of the other work.
// Given that the ESProducers are expected to be mostly
// for host-to-device data copies, that are serialized
// anyway (at least on current NVIDIA), this should be
// reasonable behavior for now.
}
return std::tuple(std::move(ret), allnull, anynull);
},
// produce() part, called after the asynchronous work in all queues have finished
[](TRecord const& record, auto fromAcquire) -> std::unique_ptr<ProductType> {
auto [ret, allnull, anynull] = std::move(fromAcquire);
// The 'allnull'/'anynull' actions are in produce()
// to keep any destination memory in 'ret'
// alive until the asynchronous work has finished
if (allnull) {
return nullptr;
} else if (anynull) {
Expand All @@ -129,17 +148,19 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
// make the EventSetup system itself aware of multiple
// devies (or memory spaces). I hope this exception
// would be good-enough until we get there.
ESProducer::throwSomeNullException();
throwSomeNullException();
}
return ret;
return std::move(ret);
},
label);
}

static void enqueueCallback(Queue& queue, edm::WaitingTaskWithArenaHolder holder);
static void throwSomeNullException();

std::string const moduleLabel_;
std::string const appendToDataLabel_;
bool const synchronize_;
};
} // namespace ALPAKA_ACCELERATOR_NAMESPACE

Expand Down
2 changes: 0 additions & 2 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "FWCore/Utilities/interface/Transition.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/DeviceProductType.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataAcquireSentry.h"
#include "HeterogeneousCore/AlpakaCore/interface/EventCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/QueueCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h"
#include "HeterogeneousCore/AlpakaInterface/interface/Backend.h"
#include "HeterogeneousCore/AlpakaInterface/interface/CopyToHost.h"
Expand Down
21 changes: 20 additions & 1 deletion HeterogeneousCore/AlpakaCore/src/alpaka/ESProducer.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
#include "FWCore/Concurrency/interface/Async.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/Utilities/interface/EDMException.h"
#include "HeterogeneousCore/AlpakaCore/interface/EventCache.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h"
#include "HeterogeneousCore/AlpakaInterface/interface/HostOnlyTask.h"

namespace ALPAKA_ACCELERATOR_NAMESPACE {
ESProducer::ESProducer(edm::ParameterSet const& iConfig)
: moduleLabel_(iConfig.getParameter<std::string>("@module_label")),
appendToDataLabel_(iConfig.getParameter<std::string>("appendToDataLabel")) {}
appendToDataLabel_(iConfig.getParameter<std::string>("appendToDataLabel")),
// The 'synchronize' parameter can be unset in Alpaka modules
// specified with the namespace prefix instead if '@alpaka'
// suffix
synchronize_(
iConfig.getUntrackedParameter<edm::ParameterSet>("alpaka").getUntrackedParameter("synchronize", false)) {}

void ESProducer::enqueueCallback(Queue& queue, edm::WaitingTaskWithArenaHolder holder) {
edm::Service<edm::Async> async;
auto event = cms::alpakatools::getEventCache<Event>().get(alpaka::getDev(queue));
alpaka::enqueue(queue, *event);
async->runAsync(
std::move(holder),
[event = std::move(event)]() mutable { alpaka::wait(*event); },
[]() { return "Enqueued via " EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE) "::ESProducer::enqueueCallback()"; });
}

void ESProducer::throwSomeNullException() {
throw edm::Exception(edm::errors::UnimplementedFeature)
Expand Down

0 comments on commit 24181b3

Please sign in to comment.