diff --git a/HeterogeneousCore/AlpakaCore/README.md b/HeterogeneousCore/AlpakaCore/README.md index 4e0dad334c3e9..acbd00c9cb5c3 100644 --- a/HeterogeneousCore/AlpakaCore/README.md +++ b/HeterogeneousCore/AlpakaCore/README.md @@ -98,7 +98,7 @@ namespace cms::alpakatools { }; } ``` -Note that the destination (device-side) type `TDst` can be different from or the same as the source (host-side) type `TSrc` as far as the framework is concerned. For example, in the `PortableCollection` model the types are different. The `copyAsync()` member function is easiest to implement as a template over `TQueue`. The framework handles the necessary synchronization between the copy function and the consumer (currently the synchronization blocks, but work is ongoing to make it non-blocking). +Note that the destination (device-side) type `TDst` can be different from or the same as the source (host-side) type `TSrc` as far as the framework is concerned. For example, in the `PortableCollection` model the types are different. The `copyAsync()` member function is easiest to implement as a template over `TQueue`. The framework handles the necessary synchronization between the copy function and the consumer in a non-blocking way. The `CopyToDevice` class template is partially specialized for all `PortableCollection` instantiations. diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h index dfcbd66b527e0..6c812cff17253 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h @@ -1,7 +1,7 @@ #ifndef HeterogeneousCore_AlpakaCore_interface_alpaka_ESProducer_h #define HeterogeneousCore_AlpakaCore_interface_alpaka_ESProducer_h -#include "FWCore/Framework/interface/ESProducer.h" +#include "FWCore/Framework/interface/ESProducerExternalWork.h" #include "FWCore/Framework/interface/MakeDataException.h" #include "FWCore/Framework/interface/produce_helpers.h" #include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h" @@ -12,7 +12,6 @@ #include "HeterogeneousCore/AlpakaInterface/interface/CopyToDevice.h" #include -#include namespace ALPAKA_ACCELERATOR_NAMESPACE { /** @@ -25,8 +24,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { * the the usual Record argument. For producing a device product, * the produce funtion should have device::Record argument. */ - class ESProducer : public edm::ESProducer { - using Base = edm::ESProducer; + class ESProducer : public edm::ESProducerExternalWork { + using Base = edm::ESProducerExternalWork; public: static void prevalidate(edm::ConfigurationDescriptions& descriptions) { @@ -76,6 +75,10 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { auto const& devices = cms::alpakatools::devices(); assert(devices.size() == 1); device::Record const deviceRecord(record, devices.front()); + static_assert(std::is_same_v, + alpaka::Queue>, + "Non-blocking queue when trying to use ES data product directly. This might indicate a " + "need to extend the Alpaka ESProducer base class."); return std::invoke(iMethod, iThis, deviceRecord); }, label); @@ -94,12 +97,12 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { using TProduct = typename edm::eventsetup::produce::smart_pointer_traits::type; using ProductType = ESDeviceProduct; using ReturnType = detail::ESDeviceProductWithStorage; - return Base::setWhatProduced( - [function = std::forward(func)](TRecord const& record) -> std::unique_ptr { + return Base::setWhatAcquiredProducedWithLambda( + // acquire() part + [function = std::forward(func), synchronize = synchronize_](TRecord const& record, + edm::WaitingTaskWithArenaHolder holder) { // TODO: move the multiple device support into EventSetup system itself auto const& devices = cms::alpakatools::devices(); - std::vector> queues; - queues.reserve(devices.size()); auto ret = std::make_unique(devices.size()); bool allnull = true; bool anynull = false; @@ -112,12 +115,28 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { } else { anynull = true; } - queues.push_back(deviceRecord.queuePtr()); - } - // TODO: to be changed asynchronous later - for (auto& queuePtr : queues) { - alpaka::wait(*queuePtr); + if (synchronize) { + alpaka::wait(deviceRecord.queue()); + } else { + enqueueCallback(deviceRecord.queue(), std::move(holder)); + } + // The Queue is returned to the QueueCache. The same + // Queue may be used for other work before the work + // enqueued here finishes. The only impact would be a + // (slight?) delay in the completion of the other work. + // Given that the ESProducers are expected to be mostly + // for host-to-device data copies, that are serialized + // anyway (at least on current NVIDIA), this should be + // reasonable behavior for now. } + return std::tuple(std::move(ret), allnull, anynull); + }, + // produce() part, called after the asynchronous work in all queues have finished + [](TRecord const& record, auto fromAcquire) -> std::unique_ptr { + auto [ret, allnull, anynull] = std::move(fromAcquire); + // The 'allnull'/'anynull' actions are in produce() + // to keep any destination memory in 'ret' + // alive until the asynchronous work has finished if (allnull) { return nullptr; } else if (anynull) { @@ -129,17 +148,19 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { // make the EventSetup system itself aware of multiple // devies (or memory spaces). I hope this exception // would be good-enough until we get there. - ESProducer::throwSomeNullException(); + throwSomeNullException(); } - return ret; + return std::move(ret); }, label); } + static void enqueueCallback(Queue& queue, edm::WaitingTaskWithArenaHolder holder); static void throwSomeNullException(); std::string const moduleLabel_; std::string const appendToDataLabel_; + bool const synchronize_; }; } // namespace ALPAKA_ACCELERATOR_NAMESPACE diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h index d8d2f2de6fea3..c2d41e91a9106 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h @@ -10,8 +10,6 @@ #include "FWCore/Utilities/interface/Transition.h" #include "HeterogeneousCore/AlpakaCore/interface/alpaka/DeviceProductType.h" #include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataAcquireSentry.h" -#include "HeterogeneousCore/AlpakaCore/interface/EventCache.h" -#include "HeterogeneousCore/AlpakaCore/interface/QueueCache.h" #include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h" #include "HeterogeneousCore/AlpakaInterface/interface/Backend.h" #include "HeterogeneousCore/AlpakaInterface/interface/CopyToHost.h" diff --git a/HeterogeneousCore/AlpakaCore/src/alpaka/ESProducer.cc b/HeterogeneousCore/AlpakaCore/src/alpaka/ESProducer.cc index c4f7c58a68e1d..028718bc74c85 100644 --- a/HeterogeneousCore/AlpakaCore/src/alpaka/ESProducer.cc +++ b/HeterogeneousCore/AlpakaCore/src/alpaka/ESProducer.cc @@ -1,10 +1,29 @@ +#include "FWCore/Concurrency/interface/Async.h" +#include "FWCore/ServiceRegistry/interface/Service.h" #include "FWCore/Utilities/interface/EDMException.h" +#include "HeterogeneousCore/AlpakaCore/interface/EventCache.h" #include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESProducer.h" +#include "HeterogeneousCore/AlpakaInterface/interface/HostOnlyTask.h" namespace ALPAKA_ACCELERATOR_NAMESPACE { ESProducer::ESProducer(edm::ParameterSet const& iConfig) : moduleLabel_(iConfig.getParameter("@module_label")), - appendToDataLabel_(iConfig.getParameter("appendToDataLabel")) {} + appendToDataLabel_(iConfig.getParameter("appendToDataLabel")), + // The 'synchronize' parameter can be unset in Alpaka modules + // specified with the namespace prefix instead if '@alpaka' + // suffix + synchronize_( + iConfig.getUntrackedParameter("alpaka").getUntrackedParameter("synchronize", false)) {} + + void ESProducer::enqueueCallback(Queue& queue, edm::WaitingTaskWithArenaHolder holder) { + edm::Service async; + auto event = cms::alpakatools::getEventCache().get(alpaka::getDev(queue)); + alpaka::enqueue(queue, *event); + async->runAsync( + std::move(holder), + [event = std::move(event)]() mutable { alpaka::wait(*event); }, + []() { return "Enqueued via " EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE) "::ESProducer::enqueueCallback()"; }); + } void ESProducer::throwSomeNullException() { throw edm::Exception(edm::errors::UnimplementedFeature)