diff --git a/HeterogeneousCore/AlpakaCore/README.md b/HeterogeneousCore/AlpakaCore/README.md index 4dd972ab2c863..dc725db4eb00b 100644 --- a/HeterogeneousCore/AlpakaCore/README.md +++ b/HeterogeneousCore/AlpakaCore/README.md @@ -170,7 +170,7 @@ Also note that the `fillDescription()` function must have the same content for a * All Event data products in the host memory space are guaranteed to be accessible for all operations (after the data product has been obtained from the `edm::Event` or `device::Event`). * All EventSetup data products in the device memory space are guaranteed to be accessible only for operations enqueued in the `Queue` given by `device::Event::queue()` when accessed via the `device::EventSetup` (ED modules), or by `device::Record::queue()` when accessed via the `device::Record` (ESProducers). * The EDM Stream does not proceed to the next Event until after all asynchronous work of the current Event has finished. - * **Note**: currently this guarantee does not hold if the job has any EDModule that launches asynchronous work but does not explicitly synchronize or produce any device-side data products. + * **Note**: this implies if an EDProducer in its `produce()` function uses the `Event::queue()` or gets a device-side data product, and does not produce any device-side data products, the `produce()` call will be synchronous (i.e. will block the CPU thread until the asynchronous work finishes) ## Examples diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h index ddfb9f706621b..016df5d9ad59e 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h @@ -48,6 +48,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { Queue& queue() const { return *queue_; } void recordEvent() {} + void discardEvent() {} private: std::shared_ptr queue_; @@ -73,6 +74,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { void enqueueCallback(edm::WaitingTaskWithArenaHolder holder); void recordEvent() { alpaka::enqueue(*queue_, *event_); } + void discardEvent() { event_.reset(); } /** * Synchronizes 'consumer' metadata wrt. 'this' in the event product @@ -92,6 +94,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { // consumer or not. The goal is to have a "chain" of modules to // queue their work to the same queue. mutable std::atomic mayReuseQueue_ = true; + // Cache to potentially reduce alpaka::wait() calls + mutable std::atomic eventComplete_ = false; }; #endif } // namespace ALPAKA_ACCELERATOR_NAMESPACE diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataSentry.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataSentry.h index 4698f029a5b7a..a56670af92210 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataSentry.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataSentry.h @@ -26,7 +26,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { std::shared_ptr metadata() { return metadata_; } - void finish(); + // true if asynchronous work was (possibly) launched + void finish(bool launchedAsyncWork); private: std::shared_ptr metadata_; diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/Event.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/Event.h index 6ddd0dcc2356e..247d4f7a8844b 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/Event.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/Event.h @@ -137,6 +137,9 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device { } } + // implementation details + bool wasQueueUsed() const { return queueUsed_; } + private: // Having both const and non-const here in order to serve the // clients with one device::Event class @@ -144,6 +147,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device { edm::Event* event_ = nullptr; std::shared_ptr metadata_; + // device::Event is not supposed to be const-thread-safe, so no // additional protection is needed. mutable bool queueUsed_ = false; diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/global/EDProducer.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/global/EDProducer.h index 2142f9a7a3417..374ee0001f376 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/global/EDProducer.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/global/EDProducer.h @@ -23,7 +23,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { device::EventSetup const es(iSetup, ev.device()); produce(sid, ev, es); this->putBackend(iEvent); - sentry.finish(); + sentry.finish(ev.wasQueueUsed()); } virtual void produce(edm::StreamID sid, device::Event& iEvent, device::EventSetup const& iSetup) const = 0; diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/stream/EDProducer.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/stream/EDProducer.h index b51e10beae231..08687b0b804b3 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/stream/EDProducer.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/stream/EDProducer.h @@ -23,7 +23,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { device::EventSetup const es(iSetup, ev.device()); produce(ev, es); this->putBackend(iEvent); - sentry.finish(); + sentry.finish(ev.wasQueueUsed()); } virtual void produce(device::Event& iEvent, device::EventSetup const& iSetup) = 0; diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/stream/SynchronizingEDProducer.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/stream/SynchronizingEDProducer.h index a88fb250d41b1..28abd77be20db 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/stream/SynchronizingEDProducer.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/stream/SynchronizingEDProducer.h @@ -36,7 +36,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { device::EventSetup const es(iSetup, ev.device()); produce(ev, es); this->putBackend(iEvent); - sentry.finish(); + sentry.finish(ev.wasQueueUsed()); } virtual void acquire(device::Event const& iEvent, device::EventSetup const& iSetup) = 0; diff --git a/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc b/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc index 8847d9b3f8e71..340bb6cb7083d 100644 --- a/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc +++ b/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc @@ -14,7 +14,14 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { // TODO: a callback notifying a WaitingTaskHolder (or similar) // would avoid blocking the CPU, but would also require more work. - if (event_) { + // If event_ is null, the EDMetadata was either + // default-constructed, or fully synchronized before leaving the + // produce() call, so no synchronization is needed. + // If the queue was re-used, then some other EDMetadata object in + // the same edm::Event records the event_ (in the same queue) and + // calls alpaka::wait(), and therefore this wait() call can be + // skipped). + if (event_ and not eventComplete_ and mayReuseQueue_) { // Must not throw in a destructor, and if there were an // exception could not really propagate it anyway. CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) { @@ -42,12 +49,25 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { } } + if (eventComplete_) { + return; + } + // TODO: how necessary this check is? if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) { throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported"; } - if (not alpaka::isComplete(*event_)) { + // If the event has been discarded, the produce() function that + // constructed this EDMetadata object did not launch any + // asynchronous work. + if (not event_) { + return; + } + + if (alpaka::isComplete(*event_)) { + eventComplete_ = true; + } else { // Event not yet occurred, so need to add synchronization // here. Sychronization is done by making the queue to wait // for an event, so all subsequent work in the queue will run diff --git a/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataSentry.cc b/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataSentry.cc index 7828fdeaaa785..da5e9065e9c12 100644 --- a/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataSentry.cc +++ b/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataSentry.cc @@ -15,6 +15,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { #endif } - void EDMetadataSentry::finish() { metadata_->recordEvent(); } + void EDMetadataSentry::finish(bool launchedAsyncWork) { + if (launchedAsyncWork) { + metadata_->recordEvent(); + } else { + // If we are certain no asynchronous work was launched (i.e. + // the Queue was not used in any way), there is no need to + // synchronize, and the Event can be discarded. + metadata_->discardEvent(); + } + } } // namespace detail } // namespace ALPAKA_ACCELERATOR_NAMESPACE diff --git a/HeterogeneousCore/AlpakaTest/plugins/alpaka/TestAlpakaGlobalProducerNoOutput.cc b/HeterogeneousCore/AlpakaTest/plugins/alpaka/TestAlpakaGlobalProducerNoOutput.cc new file mode 100644 index 0000000000000..d7d361b6ab8d1 --- /dev/null +++ b/HeterogeneousCore/AlpakaTest/plugins/alpaka/TestAlpakaGlobalProducerNoOutput.cc @@ -0,0 +1,38 @@ +#include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceCollection.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/InputTag.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/global/EDProducer.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESGetToken.h" +#include "HeterogeneousCore/AlpakaInterface/interface/config.h" + +namespace ALPAKA_ACCELERATOR_NAMESPACE { + /** + * This EDProducer only consumes a device EDProduct, and is intended + * only for testing purposes. Do not use it as an example. + */ + class TestAlpakaGlobalProducerNoOutput : public global::EDProducer<> { + public: + TestAlpakaGlobalProducerNoOutput(edm::ParameterSet const& config) + : getToken_(consumes(config.getParameter("source"))) {} + + void produce(edm::StreamID, device::Event& iEvent, device::EventSetup const& iSetup) const override { + [[maybe_unused]] auto const& input = iEvent.get(getToken_); + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + desc.add("source", edm::InputTag{}); + + descriptions.addWithDefaultLabel(desc); + } + + private: + const device::EDGetToken getToken_; + }; +} // namespace ALPAKA_ACCELERATOR_NAMESPACE + +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h" +DEFINE_FWK_ALPAKA_MODULE(TestAlpakaGlobalProducerNoOutput); diff --git a/HeterogeneousCore/AlpakaTest/plugins/alpaka/TestAlpakaStreamSynchronizingProducerToDevice.cc b/HeterogeneousCore/AlpakaTest/plugins/alpaka/TestAlpakaStreamSynchronizingProducerToDevice.cc new file mode 100644 index 0000000000000..913636f686805 --- /dev/null +++ b/HeterogeneousCore/AlpakaTest/plugins/alpaka/TestAlpakaStreamSynchronizingProducerToDevice.cc @@ -0,0 +1,62 @@ +#include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceCollection.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/InputTag.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/stream/SynchronizingEDProducer.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h" +#include "HeterogeneousCore/AlpakaInterface/interface/config.h" + +#include "TestAlgo.h" + +namespace ALPAKA_ACCELERATOR_NAMESPACE { + /** + * This class demonstrates a stream EDProducer that + * - produces a device EDProduct (that can get transferred to host automatically) + * - synchronizes in a non-blocking way with the ExternalWork module + * ability (via the SynchronizingEDProcucer base class) + */ + class TestAlpakaStreamSynchronizingProducerToDevice : public stream::SynchronizingEDProducer<> { + public: + TestAlpakaStreamSynchronizingProducerToDevice(edm::ParameterSet const& iConfig) + : putToken_{produces()}, + size_{iConfig.getParameter("size").getParameter( + EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE))} {} + + void acquire(device::Event const& iEvent, device::EventSetup const& iSetup) override { + deviceProduct_ = std::make_unique(size_, iEvent.queue()); + + // run the algorithm, potentially asynchronously + algo_.fill(iEvent.queue(), *deviceProduct_); + } + + void produce(device::Event& iEvent, device::EventSetup const& iSetup) override { + iEvent.put(putToken_, std::move(deviceProduct_)); + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + + edm::ParameterSetDescription psetSize; + psetSize.add("alpaka_serial_sync"); + psetSize.add("alpaka_cuda_async"); + psetSize.add("alpaka_rocm_async"); + desc.add("size", psetSize); + + descriptions.addWithDefaultLabel(desc); + } + + private: + const device::EDPutToken putToken_; + const int32_t size_; + + // implementation of the algorithm + TestAlgo algo_; + + std::unique_ptr deviceProduct_; + }; + +} // namespace ALPAKA_ACCELERATOR_NAMESPACE + +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h" +DEFINE_FWK_ALPAKA_MODULE(TestAlpakaStreamSynchronizingProducerToDevice); diff --git a/HeterogeneousCore/AlpakaTest/test/testAlpakaModules_cfg.py b/HeterogeneousCore/AlpakaTest/test/testAlpakaModules_cfg.py index 331fac9b84312..238fee3597e70 100644 --- a/HeterogeneousCore/AlpakaTest/test/testAlpakaModules_cfg.py +++ b/HeterogeneousCore/AlpakaTest/test/testAlpakaModules_cfg.py @@ -97,12 +97,22 @@ intSource = cms.InputTag("intProduct"), expectedInt = cms.int32(84) # sum of intProduct and esProducerA ) +process.alpakaStreamSynchronizingProducerToDevice = cms.EDProducer("TestAlpakaStreamSynchronizingProducerToDevice@alpaka", + size = cms.PSet( + alpaka_serial_sync = cms.int32(1), + alpaka_cuda_async = cms.int32(2), + alpaka_rocm_async = cms.int32(3), + ) +) process.alpakaGlobalConsumer = cms.EDAnalyzer("TestAlpakaAnalyzer", source = cms.InputTag("alpakaGlobalProducer"), expectSize = cms.int32(10), expectBackend = cms.string("SerialSync") ) +process.alpakaGlobalDeviceConsumer = cms.EDProducer("TestAlpakaGlobalProducerNoOutput@alpaka", + source = cms.InputTag("alpakaGlobalProducer") +) process.alpakaGlobalConsumerE = process.alpakaGlobalConsumer.clone( source = "alpakaGlobalProducerE", expectXvalues = cms.vdouble([(i%2)*10+1 + abs(27)+i*2 for i in range(0,5)] + [0]*5) @@ -112,6 +122,9 @@ expectSize = cms.int32(5), expectBackend = cms.string("SerialSync") ) +process.alpakaStreamDeviceConsumer = process.alpakaGlobalDeviceConsumer.clone( + source = "alpakaStreamProducer" +) process.alpakaStreamInstanceConsumer = cms.EDAnalyzer("TestAlpakaAnalyzer", source = cms.InputTag("alpakaStreamInstanceProducer", "testInstance"), expectSize = cms.int32(6), @@ -122,6 +135,10 @@ expectSize = cms.int32(10), expectBackend = cms.string("SerialSync") ) +process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1 = process.alpakaGlobalDeviceConsumer.clone( + source = "alpakaStreamSynchronizingProducerToDevice" +) +process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer2 = process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1.clone() process.alpakaNullESConsumer = cms.EDProducer("TestAlpakaGlobalProducerNullES@alpaka", eventSetupSource = cms.ESInputTag("", "null") ) @@ -132,7 +149,10 @@ for name in ["ESProducerA", "ESProducerB", "ESProducerC", "ESProducerD", "ESProducerE", "ESProducerNull", "GlobalProducer", "GlobalProducerE", - "StreamProducer", "StreamInstanceProducer", "StreamSynchronizingProducer", + "StreamProducer", "StreamInstanceProducer", + "StreamSynchronizingProducer", "StreamSynchronizingProducerToDevice", + "GlobalDeviceConsumer", "StreamDeviceConsumer", + "StreamSynchronizingProducerToDeviceDeviceConsumer1", "StreamSynchronizingProducerToDeviceDeviceConsumer2", "NullESConsumer"]: mod = getattr(process, "alpaka"+name) mod.alpaka = cms.untracked.PSet(backend = cms.untracked.string(args.moduleBackend)) @@ -173,14 +193,19 @@ def setExpect(m, size): process.alpakaGlobalProducerE, process.alpakaStreamProducer, process.alpakaStreamInstanceProducer, - process.alpakaStreamSynchronizingProducer + process.alpakaStreamSynchronizingProducer, + process.alpakaStreamSynchronizingProducerToDevice ) process.p = cms.Path( process.alpakaGlobalConsumer+ + process.alpakaGlobalDeviceConsumer+ process.alpakaGlobalConsumerE+ process.alpakaStreamConsumer+ + process.alpakaStreamDeviceConsumer+ process.alpakaStreamInstanceConsumer+ process.alpakaStreamSynchronizingConsumer+ + process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1+ + process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer2+ process.alpakaNullESConsumer, process.t )