diff --git a/HeterogeneousCore/SonicCore/BuildFile.xml b/HeterogeneousCore/SonicCore/BuildFile.xml new file mode 100644 index 0000000000000..78155cf0eafb0 --- /dev/null +++ b/HeterogeneousCore/SonicCore/BuildFile.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/HeterogeneousCore/SonicCore/README.md b/HeterogeneousCore/SonicCore/README.md new file mode 100644 index 0000000000000..a341992e76851 --- /dev/null +++ b/HeterogeneousCore/SonicCore/README.md @@ -0,0 +1,103 @@ +# SONIC core infrastructure + +SONIC: Services for Optimized Network Inference on Coprocessors + +## For analyzers + +The `SonicEDProducer` class template extends the basic Stream producer module in CMSSW. + +To implement a concrete derived producer class, the following skeleton can be used: +```cpp +#include "HeterogeneousCore/SonicCore/interface/SonicEDProducer.h" +#include "FWCore/Framework/interface/MakerMacros.h" + +class MyProducer : public SonicEDProducer +{ + public: + explicit MyProducer(edm::ParameterSet const& cfg) : SonicEDProducer(cfg) { + //for debugging + setDebugName("MyProducer"); + } + void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, Input& iInput) override { + //convert event data to client input format + } + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, Output const& iOutput) override { + //convert client output to event data format + } + static void fillDescriptions(edm::ConfigurationDescriptions & descriptions) { + edm::ParameterSetDescription desc; + Client::fillPSetDescription(desc); + //add producer-specific parameters + descriptions.add("MyProducer",desc); + } +}; + +DEFINE_FWK_MODULE(MyProducer); +``` + +The generic `Client` must be replaced with a concrete client (see next section), which has specific input and output types. + +The python configuration for the producer should include a dedicated `PSet` for the client parameters: +```python +process.MyProducer = cms.EDProducer("MyProducer", + Client = cms.PSet( + # necessary client options go here + ) +) +``` +These parameters can be prepopulated and validated by the client using `fillDescriptions` (see below). + +An example producer can be found in the [test](./test) folder. + +## For developers + +To add a new communication protocol for SONIC, follow these steps: +1. Submit the communication protocol software and any new dependencies to [cmsdist](https://github.com/cms-sw/cmsdist) as externals +2. Set up the concrete client(s) that use the communication protocol in a new package in the `HeterogeneousCore` subsystem +3. Add a test producer (see above) to make sure it works + +To implement a concrete client, the following skeleton can be used for the `.h` file, with the function implementations in an associated `.cc` file: +```cpp +#ifndef HeterogeneousCore_MyPackage_MyClient +#define HeterogeneousCore_MyPackage_MyClient + +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClient*.h" + +class MyClient : public SonicClient* { + public: + MyClient(const edm::ParameterSet& params); + + static void fillPSetDescription(edm::ParameterSetDescription& iDesc); + + protected: + void evaluate() override; +}; + +#endif +``` + +The generic `SonicClient*` should be replaced with one of the available modes: +* `SonicClientSync`: synchronous call, blocks until the result is returned. +* `SonicClientAsync`: asynchronous, non-blocking call. +* `SonicClientPseudoAsync`: turns a synchronous, blocking call into an asynchronous, non-blocking call, by waiting for the result in a separate `std::thread`. + +`SonicClientAsync` is the most efficient, but can only be used if asynchronous, non-blocking calls are supported by the communication protocol in use. + +In addition, as indicated, the input and output data types must be specified. +(If both types are the same, only the input type needs to be specified.) + +In all cases, the implementation of `evaluate()` must call `finish()`. +For the `Sync` and `PseudoAsync` modes, `finish()` should be called at the end of `evaluate()`. +For the `Async` mode, `finish()` should be called inside the communication protocol callback function (implementations may vary). + +The client must also provide a static method `fillPSetDescription` to populate its parameters in the `fillDescriptions` for the producers that use the client: +```cpp +void MyClient::fillPSetDescription(edm::ParameterSetDescription& iDesc) { + edm::ParameterSetDescription descClient; + //add parameters + iDesc.add("Client",descClient); +} +``` + +Example client code can be found in the `interface` and `src` directories of the other Sonic packages in this repository. diff --git a/HeterogeneousCore/SonicCore/interface/SonicClientAsync.h b/HeterogeneousCore/SonicCore/interface/SonicClientAsync.h new file mode 100644 index 0000000000000..cd8c03dba17aa --- /dev/null +++ b/HeterogeneousCore/SonicCore/interface/SonicClientAsync.h @@ -0,0 +1,20 @@ +#ifndef HeterogeneousCore_SonicCore_SonicClientAsync +#define HeterogeneousCore_SonicCore_SonicClientAsync + +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" + +#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientTypes.h" + +template +class SonicClientAsync : public SonicClientBase, public SonicClientTypes { +public: + //main operation + void dispatch(edm::WaitingTaskWithArenaHolder holder) override final { + holder_ = std::move(holder); + setStartTime(); + evaluate(); + } +}; + +#endif diff --git a/HeterogeneousCore/SonicCore/interface/SonicClientBase.h b/HeterogeneousCore/SonicCore/interface/SonicClientBase.h new file mode 100644 index 0000000000000..7ea3fe19afb24 --- /dev/null +++ b/HeterogeneousCore/SonicCore/interface/SonicClientBase.h @@ -0,0 +1,48 @@ +#ifndef HeterogeneousCore_SonicCore_SonicClientBase +#define HeterogeneousCore_SonicCore_SonicClientBase + +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" + +#include +#include +#include + +class SonicClientBase { +public: + //destructor + virtual ~SonicClientBase() = default; + + void setDebugName(const std::string& debugName) { debugName_ = debugName; } + const std::string& debugName() const { return debugName_; } + + //main operation + virtual void dispatch(edm::WaitingTaskWithArenaHolder holder) = 0; + +protected: + virtual void evaluate() = 0; + + void setStartTime() { + if (debugName_.empty()) + return; + t0_ = std::chrono::high_resolution_clock::now(); + } + + void finish(std::exception_ptr eptr = std::exception_ptr{}) { + if (!debugName_.empty()) { + auto t1 = std::chrono::high_resolution_clock::now(); + edm::LogInfo(debugName_) << "Client time: " + << std::chrono::duration_cast(t1 - t0_).count(); + } + holder_.doneWaiting(eptr); + } + + //members + edm::WaitingTaskWithArenaHolder holder_; + + //for logging/debugging + std::string debugName_; + std::chrono::time_point t0_; +}; + +#endif diff --git a/HeterogeneousCore/SonicCore/interface/SonicClientPseudoAsync.h b/HeterogeneousCore/SonicCore/interface/SonicClientPseudoAsync.h new file mode 100644 index 0000000000000..5ea20ed3ad30c --- /dev/null +++ b/HeterogeneousCore/SonicCore/interface/SonicClientPseudoAsync.h @@ -0,0 +1,76 @@ +#ifndef HeterogeneousCore_SonicCore_SonicClientPseudoAsync +#define HeterogeneousCore_SonicCore_SonicClientPseudoAsync + +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientTypes.h" + +#include +#include +#include +#include +#include +#include + +//pretend to be async + non-blocking by waiting for blocking calls to return in separate std::thread +template +class SonicClientPseudoAsync : public SonicClientBase, public SonicClientTypes { +public: + //constructor + SonicClientPseudoAsync() : SonicClientBase(), SonicClientTypes(), hasCall_(false), stop_(false) { + thread_ = std::make_unique([this]() { waitForNext(); }); + } + //destructor + ~SonicClientPseudoAsync() override { + stop_ = true; + cond_.notify_one(); + if (thread_) { + try { + thread_->join(); + thread_.reset(); + } catch (...) { + } + } + } + //accessor + void dispatch(edm::WaitingTaskWithArenaHolder holder) override final { + //do all read/writes inside lock to ensure cache synchronization + { + std::lock_guard guard(mutex_); + holder_ = std::move(holder); + setStartTime(); + + //activate thread to wait for response, and return + hasCall_ = true; + } + cond_.notify_one(); + } + +private: + void waitForNext() { + while (true) { + //wait for condition + { + std::unique_lock lk(mutex_); + cond_.wait(lk, [this]() { return (hasCall_ or stop_); }); + if (stop_) + break; + + //do everything inside lock + evaluate(); + + //reset condition + hasCall_ = false; + } + } + } + + //members + bool hasCall_; + std::mutex mutex_; + std::condition_variable cond_; + std::atomic stop_; + std::unique_ptr thread_; +}; + +#endif diff --git a/HeterogeneousCore/SonicCore/interface/SonicClientSync.h b/HeterogeneousCore/SonicCore/interface/SonicClientSync.h new file mode 100644 index 0000000000000..247a36b4c5d2e --- /dev/null +++ b/HeterogeneousCore/SonicCore/interface/SonicClientSync.h @@ -0,0 +1,23 @@ +#ifndef HeterogeneousCore_SonicCore_SonicClientSync +#define HeterogeneousCore_SonicCore_SonicClientSync + +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" + +#include "HeterogeneousCore/SonicCore/interface/SonicClientBase.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientTypes.h" + +#include + +template +class SonicClientSync : public SonicClientBase, public SonicClientTypes { +public: + //main operation + void dispatch(edm::WaitingTaskWithArenaHolder holder) override final { + holder_ = std::move(holder); + setStartTime(); + + evaluate(); + } +}; + +#endif diff --git a/HeterogeneousCore/SonicCore/interface/SonicClientTypes.h b/HeterogeneousCore/SonicCore/interface/SonicClientTypes.h new file mode 100644 index 0000000000000..3056893c8fb29 --- /dev/null +++ b/HeterogeneousCore/SonicCore/interface/SonicClientTypes.h @@ -0,0 +1,23 @@ +#ifndef HeterogeneousCore_SonicCore_SonicClientTypes +#define HeterogeneousCore_SonicCore_SonicClientTypes + +//this base class exists to limit the impact of dependent scope in derived classes +template +class SonicClientTypes { +public: + //typedefs for outside accessibility + typedef InputT Input; + typedef OutputT Output; + //destructor + virtual ~SonicClientTypes() = default; + + //accessors + Input& input() { return input_; } + const Output& output() const { return output_; } + +protected: + Input input_; + Output output_; +}; + +#endif diff --git a/HeterogeneousCore/SonicCore/interface/SonicEDProducer.h b/HeterogeneousCore/SonicCore/interface/SonicEDProducer.h new file mode 100644 index 0000000000000..0bc0689999ede --- /dev/null +++ b/HeterogeneousCore/SonicCore/interface/SonicEDProducer.h @@ -0,0 +1,55 @@ +#ifndef HeterogeneousCore_SonicCore_SonicEDProducer +#define HeterogeneousCore_SonicCore_SonicEDProducer + +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/stream/EDProducer.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/Framework/interface/EventSetup.h" +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" + +#include +#include + +//this is a stream producer because client operations are not multithread-safe in general +//it is designed such that the user never has to interact with the client or the acquire() callback directly +template +class SonicEDProducer : public edm::stream::EDProducer { +public: + //typedefs to simplify usage + typedef typename Client::Input Input; + typedef typename Client::Output Output; + //constructor + SonicEDProducer(edm::ParameterSet const& cfg) : client_(cfg.getParameter("Client")) {} + //destructor + virtual ~SonicEDProducer() = default; + + //derived classes use a dedicated acquire() interface that incorporates client_.input() + //(no need to interact with callback holder) + void acquire(edm::Event const& iEvent, + edm::EventSetup const& iSetup, + edm::WaitingTaskWithArenaHolder holder) override final { + auto t0 = std::chrono::high_resolution_clock::now(); + acquire(iEvent, iSetup, client_.input()); + auto t1 = std::chrono::high_resolution_clock::now(); + if (!client_.debugName().empty()) + edm::LogInfo(client_.debugName()) << "Load time: " + << std::chrono::duration_cast(t1 - t0).count(); + client_.dispatch(holder); + } + virtual void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, Input& iInput) = 0; + //derived classes use a dedicated produce() interface that incorporates client_.output() + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup) override final { + //todo: measure time between acquire and produce + produce(iEvent, iSetup, client_.output()); + } + virtual void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, Output const& iOutput) = 0; + +protected: + //for debugging + void setDebugName(const std::string& debugName) { client_.setDebugName(debugName); } + //members + Client client_; +}; + +#endif diff --git a/HeterogeneousCore/SonicCore/test/BuildFile.xml b/HeterogeneousCore/SonicCore/test/BuildFile.xml new file mode 100644 index 0000000000000..c95f778ce31d0 --- /dev/null +++ b/HeterogeneousCore/SonicCore/test/BuildFile.xml @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/HeterogeneousCore/SonicCore/test/DummyClient.h b/HeterogeneousCore/SonicCore/test/DummyClient.h new file mode 100644 index 0000000000000..228730ce1a374 --- /dev/null +++ b/HeterogeneousCore/SonicCore/test/DummyClient.h @@ -0,0 +1,47 @@ +#ifndef HeterogeneousCore_SonicCore_test_DummyClient +#define HeterogeneousCore_SonicCore_test_DummyClient + +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientSync.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientPseudoAsync.h" +#include "HeterogeneousCore/SonicCore/interface/SonicClientAsync.h" + +#include +#include +#include + +template +class DummyClient : public Client { +public: + //constructor + DummyClient(const edm::ParameterSet& params) + : factor_(params.getParameter("factor")), wait_(params.getParameter("wait")) {} + + //for fillDescriptions + static void fillPSetDescription(edm::ParameterSetDescription& iDesc) { + edm::ParameterSetDescription descClient; + descClient.add("factor", -1); + descClient.add("wait", 10); + iDesc.add("Client", descClient); + } + +protected: + void evaluate() override { + //simulate a blocking call + std::this_thread::sleep_for(std::chrono::seconds(wait_)); + + this->output_ = this->input_ * factor_; + this->finish(); + } + + //members + int factor_; + int wait_; +}; + +typedef DummyClient> DummyClientSync; +typedef DummyClient> DummyClientPseudoAsync; +typedef DummyClient> DummyClientAsync; + +#endif diff --git a/HeterogeneousCore/SonicCore/test/SonicDummyProducer.cc b/HeterogeneousCore/SonicCore/test/SonicDummyProducer.cc new file mode 100644 index 0000000000000..dc406c86c930b --- /dev/null +++ b/HeterogeneousCore/SonicCore/test/SonicDummyProducer.cc @@ -0,0 +1,53 @@ +#include "DummyClient.h" +#include "HeterogeneousCore/SonicCore/interface/SonicEDProducer.h" +#include "DataFormats/TestObjects/interface/ToyProducts.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/Framework/interface/MakerMacros.h" + +#include + +namespace sonictest { + template + class SonicDummyProducer : public SonicEDProducer { + public: + //needed because base class has dependent scope + using typename SonicEDProducer::Input; + using typename SonicEDProducer::Output; + explicit SonicDummyProducer(edm::ParameterSet const& cfg) + : SonicEDProducer(cfg), input_(cfg.getParameter("input")) { + //for debugging + this->setDebugName("SonicDummyProducer"); + putToken_ = this->template produces(); + } + + void acquire(edm::Event const& iEvent, edm::EventSetup const& iSetup, Input& iInput) override { iInput = input_; } + + void produce(edm::Event& iEvent, edm::EventSetup const& iSetup, Output const& iOutput) override { + iEvent.emplace(putToken_, iOutput); + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + Client::fillPSetDescription(desc); + desc.add("input"); + //to ensure distinct cfi names + descriptions.addWithDefaultLabel(desc); + } + + private: + //members + int input_; + edm::EDPutTokenT putToken_; + }; + + typedef SonicDummyProducer SonicDummyProducerSync; + typedef SonicDummyProducer SonicDummyProducerPseudoAsync; + typedef SonicDummyProducer SonicDummyProducerAsync; +} // namespace sonictest + +using sonictest::SonicDummyProducerSync; +DEFINE_FWK_MODULE(SonicDummyProducerSync); +using sonictest::SonicDummyProducerPseudoAsync; +DEFINE_FWK_MODULE(SonicDummyProducerPseudoAsync); +using sonictest::SonicDummyProducerAsync; +DEFINE_FWK_MODULE(SonicDummyProducerAsync); diff --git a/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py b/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py new file mode 100644 index 0000000000000..bd807088c0d83 --- /dev/null +++ b/HeterogeneousCore/SonicCore/test/sonicTest_cfg.py @@ -0,0 +1,55 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("Test") + +process.source = cms.Source("EmptySource") + +process.maxEvents.input = 1 + +process.options.numberOfThreads = 2 +process.options.numberOfStreams = 0 + +process.dummySync = cms.EDProducer("SonicDummyProducerSync", + input = cms.int32(1), + Client = cms.PSet( + factor = cms.int32(-1), + wait = cms.int32(10), + ), +) + +process.dummyPseudoAsync = cms.EDProducer("SonicDummyProducerPseudoAsync", + input = cms.int32(2), + Client = cms.PSet( + factor = cms.int32(2), + wait = cms.int32(10), + ), +) + +process.dummyAsync = cms.EDProducer("SonicDummyProducerAsync", + input = cms.int32(3), + Client = cms.PSet( + factor = cms.int32(5), + wait = cms.int32(10), + ), +) + +process.task = cms.Task(process.dummySync,process.dummyPseudoAsync,process.dummyAsync) + +process.testerSync = cms.EDAnalyzer("IntTestAnalyzer", + valueMustMatch = cms.untracked.int32(-1), + moduleLabel = cms.untracked.string("dummySync"), +) + +process.testerPseudoAsync = cms.EDAnalyzer("IntTestAnalyzer", + valueMustMatch = cms.untracked.int32(4), + moduleLabel = cms.untracked.string("dummyPseudoAsync"), +) + +process.testerAsync = cms.EDAnalyzer("IntTestAnalyzer", + valueMustMatch = cms.untracked.int32(15), + moduleLabel = cms.untracked.string("dummyAsync"), +) + +process.p1 = cms.Path(process.testerSync, process.task) +process.p2 = cms.Path(process.testerPseudoAsync, process.task) +process.p3 = cms.Path(process.testerAsync, process.task)