diff --git a/FWCore/Concurrency/BuildFile.xml b/FWCore/Concurrency/BuildFile.xml index a9748b4ba808d..20afbc6592e02 100644 --- a/FWCore/Concurrency/BuildFile.xml +++ b/FWCore/Concurrency/BuildFile.xml @@ -1,4 +1,4 @@ - + diff --git a/FWCore/Concurrency/interface/Async.h b/FWCore/Concurrency/interface/Async.h new file mode 100644 index 0000000000000..c14e060ef9db8 --- /dev/null +++ b/FWCore/Concurrency/interface/Async.h @@ -0,0 +1,34 @@ +#ifndef FWCore_Concurrency_Async_h +#define FWCore_Concurrency_Async_h + +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" +#include "FWCore/Concurrency/interface/WaitingThreadPool.h" + +namespace edm { + // All member functions are thread safe + class Async { + public: + Async() = default; + virtual ~Async() noexcept; + + // prevent copying and moving + Async(Async const&) = delete; + Async(Async&&) = delete; + Async& operator=(Async const&) = delete; + Async& operator=(Async&&) = delete; + + template + void runAsync(WaitingTaskWithArenaHolder holder, F&& func, G&& errorContextFunc) { + ensureAllowed(); + pool_.runAsync(std::move(holder), std::forward(func), std::forward(errorContextFunc)); + } + + protected: + virtual void ensureAllowed() const = 0; + + private: + WaitingThreadPool pool_; + }; +} // namespace edm + +#endif diff --git a/FWCore/Concurrency/interface/WaitingThreadPool.h b/FWCore/Concurrency/interface/WaitingThreadPool.h new file mode 100644 index 0000000000000..affca59422964 --- /dev/null +++ b/FWCore/Concurrency/interface/WaitingThreadPool.h @@ -0,0 +1,106 @@ +#ifndef FWCore_Concurrency_WaitingThreadPool_h +#define FWCore_Concurrency_WaitingThreadPool_h + +#include "FWCore/Utilities/interface/ConvertException.h" +#include "FWCore/Utilities/interface/ReusableObjectHolder.h" +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" + +#include +#include +#include + +namespace edm { + namespace impl { + class WaitingThread { + public: + WaitingThread(); + ~WaitingThread() noexcept; + + WaitingThread(WaitingThread const&) = delete; + WaitingThread& operator=(WaitingThread&&) = delete; + WaitingThread(WaitingThread&&) = delete; + WaitingThread& operator=(WaitingThread const&) = delete; + + template + void run(WaitingTaskWithArenaHolder holder, + F&& func, + G&& errorContextFunc, + std::shared_ptr thisPtr) { + std::unique_lock lk(mutex_); + func_ = [holder = std::move(holder), + func = std::forward(func), + errorContext = std::forward(errorContextFunc)]() mutable { + try { + convertException::wrap([&func]() { func(); }); + } catch (cms::Exception& e) { + e.addContext(errorContext()); + holder.doneWaiting(std::current_exception()); + } + }; + thisPtr_ = std::move(thisPtr); + cond_.notify_one(); + } + + private: + void stopThread() { + std::unique_lock lk(mutex_); + stopThread_ = true; + cond_.notify_one(); + } + + void threadLoop() noexcept; + + std::thread thread_; + std::mutex mutex_; + std::condition_variable cond_; + CMS_THREAD_GUARD(mutex_) std::function func_; + // The purpose of thisPtr_ is to keep the WaitingThread object + // outside of the WaitingThreadPool until the func_ has returned. + CMS_THREAD_GUARD(mutex_) std::shared_ptr thisPtr_; + CMS_THREAD_GUARD(mutex_) bool stopThread_ = false; + }; + } // namespace impl + + // Provides a mechanism to run the function 'func' asynchronously, + // i.e. without the calling thread to wait for the func() to return. + // The func should do as little work (outside of the TBB threadpool) + // as possible. The func must terminate eventually. The intended use + // case are blocking synchronization calls with external entities, + // where the calling thread is suspended while waiting. + // + // The func() is run in a thread that belongs to a separate pool of + // threads than the calling thread. Remotely similar to + // std::async(), but instead of dealing with std::futures, takes an + // edm::WaitingTaskWithArenaHolder object, that is signaled upon the + // func() returning or throwing an exception. + // + // The caller is responsible for keeping the WaitingThreadPool + // object alive at least as long as all asynchronous calls finish. + class WaitingThreadPool { + public: + WaitingThreadPool() = default; + WaitingThreadPool(WaitingThreadPool const&) = delete; + WaitingThreadPool& operator=(WaitingThreadPool const&) = delete; + WaitingThreadPool(WaitingThreadPool&&) = delete; + WaitingThreadPool& operator=(WaitingThreadPool&&) = delete; + + /** + * \param holder WaitingTaskWithArenaHolder object to signal the completion of 'func' + * \param func Function to run in a separate thread + * \param errorContextFunc Function returning a string-like object + * that is added to the context of + * cms::Exception in case 'func' throws an + * exception + */ + template + void runAsync(WaitingTaskWithArenaHolder holder, F&& func, G&& errorContextFunc) { + auto thread = pool_.makeOrGet([]() { return std::make_unique(); }); + thread->run(std::move(holder), std::forward(func), std::forward(errorContextFunc), std::move(thread)); + } + + private: + edm::ReusableObjectHolder pool_; + }; +} // namespace edm + +#endif diff --git a/FWCore/Concurrency/src/Async.cc b/FWCore/Concurrency/src/Async.cc new file mode 100644 index 0000000000000..86cbabe4dfc32 --- /dev/null +++ b/FWCore/Concurrency/src/Async.cc @@ -0,0 +1,5 @@ +#include "FWCore/Concurrency/interface/Async.h" + +namespace edm { + Async::~Async() noexcept = default; +} diff --git a/FWCore/Concurrency/src/WaitingThreadPool.cc b/FWCore/Concurrency/src/WaitingThreadPool.cc new file mode 100644 index 0000000000000..51af859d5c6d6 --- /dev/null +++ b/FWCore/Concurrency/src/WaitingThreadPool.cc @@ -0,0 +1,58 @@ +#include "FWCore/Concurrency/interface/WaitingThreadPool.h" + +#include +#include + +#include + +namespace edm::impl { + WaitingThread::WaitingThread() { + thread_ = std::thread(&WaitingThread::threadLoop, this); + static constexpr auto poolName = "edm async pool"; + // pthread_setname_np() string length is limited to 16 characters, + // including the null termination + static_assert(std::string_view(poolName).size() < 16); + + int err = pthread_setname_np(thread_.native_handle(), poolName); + // According to the glibc documentation, the only error + // pthread_setname_np() can return is about the argument C-string + // being too long. We already check above the C-string is shorter + // than the limit was at the time of writing. In order to capture + // if the limit shortens, or other error conditions get added, + // let's assert() anyway (exception feels overkill) + assert(err == 0); + } + + WaitingThread::~WaitingThread() noexcept { + // When we are shutting down, we don't care about any possible + // system errors anymore + CMS_SA_ALLOW try { + stopThread(); + thread_.join(); + } catch (...) { + } + } + + void WaitingThread::threadLoop() noexcept { + std::unique_lock lk(mutex_); + + while (true) { + cond_.wait(lk, [this]() { return static_cast(func_) or stopThread_; }); + if (stopThread_) { + // There should be no way to stop the thread when it as the + // func_ assigned, but let's make sure + assert(not thisPtr_); + break; + } + func_(); + // Must return this WaitingThread to the ReusableObjectHolder in + // the WaitingThreadPool before resettting func_ (that holds the + // WaitingTaskWithArenaHolder, that enables the progress in the + // TBB thread pool) in order to meet the requirement of + // ReusableObjectHolder destructor that there are no outstanding + // objects. + thisPtr_.reset(); + decltype(func_)().swap(func_); + } + } +} // namespace edm::impl diff --git a/FWCore/Concurrency/test/test_catch2_Async.cc b/FWCore/Concurrency/test/test_catch2_Async.cc new file mode 100644 index 0000000000000..b42a276a6d541 --- /dev/null +++ b/FWCore/Concurrency/test/test_catch2_Async.cc @@ -0,0 +1,101 @@ +#include "catch.hpp" + +#include + +#include "oneapi/tbb/global_control.h" + +#include "FWCore/Concurrency/interface/chain_first.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" +#include "FWCore/Concurrency/interface/Async.h" + +namespace { + constexpr char const* errorContext() { return "AsyncServiceTest"; } + + class AsyncServiceTest : public edm::Async { + public: + enum class State { kAllowed, kDisallowed, kShutdown }; + + AsyncServiceTest() = default; + + void setAllowed(bool allowed) noexcept { allowed_ = allowed; } + + private: + void ensureAllowed() const final { + if (not allowed_) { + throw std::runtime_error("Calling run in this context is not allowed"); + } + } + + std::atomic allowed_ = true; + }; +} // namespace + +TEST_CASE("Test Async", "[edm::Async") { + // Using parallelism 2 here because otherwise the + // tbb::task_arena::enqueue() in WaitingTaskWithArenaHolder will + // start a new TBB thread that "inherits" the name from the + // WaitingThreadPool thread. + oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 2); + + SECTION("Normal operation") { + AsyncServiceTest service; + std::atomic count{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h1 = first([&service, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + service.runAsync( + h2, [&count]() { ++count; }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + + auto h2 = first([&service, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + service.runAsync( + h2, [&count]() { ++count; }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h2.doneWaiting(std::exception_ptr()); + h1.doneWaiting(std::exception_ptr()); + } + waitTask.waitNoThrow(); + REQUIRE(count.load() == 2); + REQUIRE(waitTask.done()); + REQUIRE(not waitTask.exceptionPtr()); + } + + SECTION("Disallowed") { + AsyncServiceTest service; + std::atomic count{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h = first([&service, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + service.runAsync( + h2, [&count]() { ++count; }, errorContext); + service.setAllowed(false); + }) | + then([&service, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + service.runAsync( + h2, [&count]() { ++count; }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h.doneWaiting(std::exception_ptr()); + } + waitTask.waitNoThrow(); + REQUIRE(count.load() == 1); + REQUIRE(waitTask.done()); + REQUIRE(waitTask.exceptionPtr()); + REQUIRE_THROWS_WITH(std::rethrow_exception(waitTask.exceptionPtr()), + Catch::Contains("Calling run in this context is not allowed")); + } +} diff --git a/FWCore/Concurrency/test/test_catch2_WaitingThreadPool.cc b/FWCore/Concurrency/test/test_catch2_WaitingThreadPool.cc new file mode 100644 index 0000000000000..4266e6ec5e5b7 --- /dev/null +++ b/FWCore/Concurrency/test/test_catch2_WaitingThreadPool.cc @@ -0,0 +1,279 @@ +#include "catch.hpp" + +#include "oneapi/tbb/global_control.h" + +#include "FWCore/Concurrency/interface/chain_first.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" +#include "FWCore/Concurrency/interface/WaitingThreadPool.h" +#include "FWCore/Concurrency/interface/hardware_pause.h" + +namespace { + constexpr char const* errorContext() { return "WaitingThreadPool test"; } +} // namespace + +TEST_CASE("Test WaitingThreadPool", "[edm::WaitingThreadPool") { + // Using parallelism 2 here because otherwise the + // tbb::task_arena::enqueue() in WaitingTaskWithArenaHolder will + // start a new TBB thread that "inherits" the name from the + // WaitingThreadPool thread. + oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, 2); + edm::WaitingThreadPool pool; + + SECTION("One async call") { + std::atomic count{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + { + using namespace edm::waiting_task::chain; + auto h = first([&pool, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + std::move(h2), [&count]() { ++count; }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h.doneWaiting(std::exception_ptr()); + } + waitTask.waitNoThrow(); + REQUIRE(count.load() == 1); + REQUIRE(waitTask.done()); + REQUIRE(not waitTask.exceptionPtr()); + } + + SECTION("Two async calls") { + std::atomic count{0}; + std::atomic mayContinue{false}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h = first([&pool, &count, &mayContinue](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + h2, + [&count, &mayContinue]() { + while (not mayContinue) { + hardware_pause(); + } + using namespace std::chrono_literals; + std::this_thread::sleep_for(10ms); + ++count; + }, + errorContext); + pool.runAsync( + h2, [&count]() { ++count; }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h.doneWaiting(std::exception_ptr()); + } + mayContinue = true; + waitTask.waitNoThrow(); + REQUIRE(count.load() == 2); + REQUIRE(waitTask.done()); + REQUIRE(not waitTask.exceptionPtr()); + } + + SECTION("Concurrent async calls") { + std::atomic count{0}; + std::atomic mayContinue{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h1 = first([&pool, &count, &mayContinue](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + ++mayContinue; + while (mayContinue != 2) { + hardware_pause(); + } + pool.runAsync( + h2, [&count]() { ++count; }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + + auto h2 = first([&pool, &count, &mayContinue](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + ++mayContinue; + while (mayContinue != 2) { + hardware_pause(); + } + pool.runAsync( + h2, [&count]() { ++count; }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h2.doneWaiting(std::exception_ptr()); + h1.doneWaiting(std::exception_ptr()); + } + waitTask.waitNoThrow(); + REQUIRE(count.load() == 2); + REQUIRE(waitTask.done()); + REQUIRE(not waitTask.exceptionPtr()); + } + + SECTION("Exceptions") { + SECTION("One async call") { + std::atomic count{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h = first([&pool, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + std::move(h2), [&count]() { throw std::runtime_error("error"); }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h.doneWaiting(std::exception_ptr()); + } + REQUIRE_THROWS_WITH( + waitTask.wait(), + Catch::Contains("error") and Catch::Contains("StdException") and Catch::Contains("WaitingThreadPool test")); + REQUIRE(count.load() == 0); + } + + SECTION("Two async calls") { + std::atomic count{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h = first([&pool, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + h2, + [&count]() { + if (count.fetch_add(1) == 0) { + throw cms::Exception("error 1"); + } + ++count; + }, + errorContext); + pool.runAsync( + h2, + [&count]() { + if (count.fetch_add(1) == 0) { + throw cms::Exception("error 2"); + } + ++count; + }, + errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h.doneWaiting(std::exception_ptr()); + } + REQUIRE_THROWS_AS(waitTask.wait(), cms::Exception); + REQUIRE(count.load() == 3); + } + + SECTION("Two exceptions") { + std::atomic count{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h = first([&pool, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + h2, + [&count]() { + ++count; + throw cms::Exception("error 1"); + }, + errorContext); + pool.runAsync( + h2, + [&count]() { + ++count; + throw cms::Exception("error 2"); + }, + errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h.doneWaiting(std::exception_ptr()); + } + REQUIRE_THROWS_AS(waitTask.wait(), cms::Exception); + REQUIRE(count.load() == 2); + } + + SECTION("Concurrent exceptions") { + std::atomic count{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h1 = first([&pool, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + h2, + [&count]() { + ++count; + throw cms::Exception("error 1"); + }, + errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + + auto h2 = first([&pool, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + h2, + [&count]() { + ++count; + throw cms::Exception("error 2"); + }, + errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h2.doneWaiting(std::exception_ptr()); + h1.doneWaiting(std::exception_ptr()); + } + REQUIRE_THROWS_AS(waitTask.wait(), cms::Exception); + REQUIRE(count.load() == 2); + } + + SECTION("Concurrent exception and success") { + std::atomic count{0}; + + oneapi::tbb::task_group group; + edm::FinalWaitingTask waitTask{group}; + + { + using namespace edm::waiting_task::chain; + auto h1 = first([&pool, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + h2, + [&count]() { + ++count; + throw cms::Exception("error 1"); + }, + errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + + auto h2 = first([&pool, &count](edm::WaitingTaskHolder h) { + edm::WaitingTaskWithArenaHolder h2(std::move(h)); + pool.runAsync( + h2, [&count]() { ++count; }, errorContext); + }) | + lastTask(edm::WaitingTaskHolder(group, &waitTask)); + h2.doneWaiting(std::exception_ptr()); + h1.doneWaiting(std::exception_ptr()); + } + REQUIRE_THROWS_AS(waitTask.wait(), cms::Exception); + REQUIRE(count.load() == 2); + } + } +} diff --git a/FWCore/Framework/src/defaultCmsRunServices.cc b/FWCore/Framework/src/defaultCmsRunServices.cc index 8ae184e0f8323..a26b9e8f16d29 100644 --- a/FWCore/Framework/src/defaultCmsRunServices.cc +++ b/FWCore/Framework/src/defaultCmsRunServices.cc @@ -26,7 +26,8 @@ namespace edm { "ResourceInformationService", "CPU", "CondorStatusService", - "XrdStatisticsService"}; + "XrdStatisticsService", + "AsyncService"}; return returnValue; } diff --git a/FWCore/Services/plugins/AsyncService.cc b/FWCore/Services/plugins/AsyncService.cc new file mode 100644 index 0000000000000..fa5ffa8327a8a --- /dev/null +++ b/FWCore/Services/plugins/AsyncService.cc @@ -0,0 +1,48 @@ +#include "FWCore/Concurrency/interface/Async.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" + +#include + +namespace edm::service { + class AsyncService : public Async { + public: + AsyncService(ParameterSet const& iConfig, ActivityRegistry& iRegistry); + + static void fillDescriptions(ConfigurationDescriptions& descriptions); + + private: + void ensureAllowed() const final; + + std::atomic allowed_ = true; + }; + + AsyncService::AsyncService(ParameterSet const& iConfig, ActivityRegistry& iRegistry) { + iRegistry.watchPreSourceEarlyTermination([this](TerminationOrigin) { allowed_ = false; }); + iRegistry.watchPreGlobalEarlyTermination([this](GlobalContext const&, TerminationOrigin) { allowed_ = false; }); + iRegistry.watchPreStreamEarlyTermination([this](StreamContext const&, TerminationOrigin) { allowed_ = false; }); + iRegistry.watchPostEndJob([this]() { allowed_ = false; }); + } + + void AsyncService::fillDescriptions(ConfigurationDescriptions& descriptions) { + ParameterSetDescription desc; + descriptions.addDefault(desc); + } + + void AsyncService::ensureAllowed() const { + if (not allowed_) { + cms::Exception ex("AsyncCallNotAllowed"); + ex.addContext("Calling Async::run()"); + ex << "Framework is shutting down, further run() calls are not allowed"; + throw ex; + } + } +} // namespace edm::service + +#include "FWCore/ServiceRegistry/interface/ServiceMaker.h" + +using edm::service::AsyncService; +using AsyncMaker = edm::serviceregistry::AllArgsMaker; +DEFINE_FWK_SERVICE_MAKER(AsyncService, AsyncMaker); diff --git a/FWCore/Services/test/AsyncServiceTester.cc b/FWCore/Services/test/AsyncServiceTester.cc new file mode 100644 index 0000000000000..f8ad24aaa8472 --- /dev/null +++ b/FWCore/Services/test/AsyncServiceTester.cc @@ -0,0 +1,252 @@ +#include "FWCore/Concurrency/interface/Async.h" +#include "FWCore/Concurrency/interface/chain_first.h" +#include "FWCore/Framework/interface/stream/EDProducer.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/ServiceRegistry/interface/ServiceMaker.h" +#include "FWCore/Utilities/interface/Exception.h" + +#include +#include + +namespace edmtest { + class AsyncServiceTesterService { + public: + AsyncServiceTesterService(edm::ParameterSet const& iConfig, edm::ActivityRegistry& iRegistry) : continue_{false} { + if (iConfig.getParameter("watchEarlyTermination")) { + iRegistry.watchPreSourceEarlyTermination([this](edm::TerminationOrigin) { release(); }); + iRegistry.watchPreGlobalEarlyTermination( + [this](edm::GlobalContext const&, edm::TerminationOrigin) { release(); }); + iRegistry.watchPreStreamEarlyTermination( + [this](edm::StreamContext const&, edm::TerminationOrigin) { release(); }); + } + if (iConfig.getParameter("watchStreamEndRun")) { + // StreamEndRun is the last stream transition in the data + // processing that does not depend on any global end + // transition + iRegistry.watchPostStreamEndRun([this](edm::StreamContext const&) { release(); }); + } + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + desc.add("watchEarlyTermination", false) + ->setComment("If true, watch EarlyTermination signals to signal the waiters"); + desc.add("watchStreamEndRun", false)->setComment("If true, watch StreamEndRun signals to signal the waiters"); + descriptions.addDefault(desc); + } + + void wait() { + std::unique_lock lk(mutex_); + if (continue_) + return; + cond_.wait(lk, [this]() { return continue_; }); + } + + bool stillWaiting() const { + std::unique_lock lk(mutex_); + return not continue_; + } + + private: + void release() { + std::unique_lock lk(mutex_); + continue_ = true; + cond_.notify_all(); + } + + mutable std::mutex mutex_; + std::condition_variable cond_; + CMS_THREAD_GUARD(mutex_) bool continue_; + }; + + struct AsyncServiceTesterCache { + struct RunGuard { + RunGuard(std::atomic* c) : calls(c) {} + ~RunGuard() { + if (calls) { + --(*calls); + } + } + void release() { calls = nullptr; } + RunGuard(RunGuard const&) = delete; + RunGuard& operator=(RunGuard const&) = delete; + RunGuard(RunGuard&& o) = delete; + RunGuard& operator=(RunGuard&&) = delete; + + std::atomic* calls = nullptr; + }; + + RunGuard makeRunCallGuard(int inc) const { + outstandingRunCalls += inc; + return RunGuard(&outstandingRunCalls); + } + + mutable std::atomic outstandingRunCalls = 0; + }; + + class AsyncServiceTester + : public edm::stream::EDProducer> { + public: + AsyncServiceTester(edm::ParameterSet const& iConfig, AsyncServiceTesterCache const*) {} + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + descriptions.addDefault(desc); + } + + static auto initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique(); } + + void acquire(edm::Event const&, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final { + if (status_ != 0) { + throw cms::Exception("Assert") << "In acquire: status_ was " << status_ << ", expected 0"; + } + edm::Service as; + auto callGuard = globalCache()->makeRunCallGuard(1); + as->runAsync( + std::move(holder), + [this]() { + auto callGuard = globalCache()->makeRunCallGuard(0); + if (status_ != 0) { + throw cms::Exception("Assert") << "In async function: status_ was " << status_ << ", expected 0"; + } + ++status_; + }, + []() { return "Calling AsyncServiceTester::acquire()"; }); + callGuard.release(); + } + + void produce(edm::Event&, edm::EventSetup const&) final { + if (status_ != 1) { + throw cms::Exception("Assert") << "In analyze: status_ was " << status_ << ", expected 1"; + } + status_ = 0; + } + + static void globalEndJob(AsyncServiceTesterCache* cache) { + if (cache->outstandingRunCalls != 0) { + throw cms::Exception("Assert") << "In globalEndJob: " << cache->outstandingRunCalls + << " runAsync() calls outstanding, expected 0"; + } + } + + private: + std::atomic status_ = 0; + }; + + class AsyncServiceWaitingTester : public edm::stream::EDProducer, + edm::stream::WatchLuminosityBlocks, + edm::stream::WatchRuns> { + public: + AsyncServiceWaitingTester(edm::ParameterSet const& iConfig, AsyncServiceTesterCache const*) + : throwingStream_(iConfig.getUntrackedParameter("throwingStream")), + waitEarlyTermination_(iConfig.getUntrackedParameter("waitEarlyTermination")), + waitStreamEndRun_(iConfig.getUntrackedParameter("waitStreamEndRun")) { + if (not waitEarlyTermination_ and not waitStreamEndRun_) { + throw cms::Exception("Configuration") + << "One of 'waitEarlyTermination' and 'waitStreamEndRun' must be set to True, both were False"; + } + if (waitEarlyTermination_ and waitStreamEndRun_) { + throw cms::Exception("Configuration") + << "Only one of 'waitEarlyTermination' and 'waitStreamEndRun' can be set to True, both were True"; + } + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + desc.addUntracked("throwingStream") + ->setComment("ID of the stream where another module throws an exception"); + desc.addUntracked("waitEarlyTermination", false) + ->setComment( + "If true, use AsyncServiceTesterService in streams other than 'throwingStream' to wait launching the " + "async activity until an early termination signal has been issued"); + desc.addUntracked("waitStreamEndRun", false) + ->setComment( + "If true, wait in the async activity in streams other than 'throwingStream' until one stream has reached " + "streamEndRun"); + descriptions.addDefault(desc); + descriptions.setComment("One of 'waitEarlyTermination' and 'waitStreamEndRun' must be set to 'True'"); + } + + static auto initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique(); } + + void beginStream(edm::StreamID id) { streamId_ = id; } + + void acquire(edm::Event const&, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final { + bool const waitOnThisStream = *streamId_ != throwingStream_; + AsyncServiceTesterService* testService = nullptr; + if (waitOnThisStream) { + edm::Service tsh; + testService = &*tsh; + if (waitEarlyTermination_) + testService->wait(); + } + if (status_ != 0) { + throw cms::Exception("Assert") << "In acquire: status_ was " << status_ << ", expected 0"; + } + edm::Service as; + auto callGuard = globalCache()->makeRunCallGuard(1); + as->runAsync( + std::move(holder), + [this, testService]() { + auto callGuard = globalCache()->makeRunCallGuard(0); + if (testService and waitStreamEndRun_) { + testService->wait(); + } + + if (status_ != 0) { + throw cms::Exception("Assert") << "In async function: status_ was " << status_ << ", expected 0"; + } + ++status_; + }, + []() { return "Calling AsyncServiceTester::acquire()"; }); + callGuard.release(); + } + + void produce(edm::Event&, edm::EventSetup const&) final { + if (status_ != 1) { + throw cms::Exception("Assert") << "In analyze: status_ was " << status_ << ", expected 1"; + } + status_ = 0; + } + + void endLuminosityBlock(edm::LuminosityBlock const&, edm::EventSetup const&) final { + if (edm::Service()->stillWaiting() and *streamId_ != throwingStream_) { + throw cms::Exception("Assert") << "In endLuminosityBlock for stream " << *streamId_ + << " that is different from the throwing stream " << throwingStream_ + << " while the waits have not been signaled"; + } + } + + void endRun(edm::Run const&, edm::EventSetup const&) final { + if (edm::Service()->stillWaiting() and *streamId_ != throwingStream_) { + throw cms::Exception("Assert") << "In endRun for stream " << *streamId_ + << " that is different from the throwing stream " << throwingStream_ + << " while the waits have not been signaled"; + } + } + + static void globalEndJob(AsyncServiceTesterCache* cache) { + if (cache->outstandingRunCalls != 0) { + throw cms::Exception("Assert") << "In globalEndJob: " << cache->outstandingRunCalls + << " runAsync() calls outstanding, expected 0"; + } + } + + private: + std::atomic status_ = 0; + std::optional streamId_; + unsigned int const throwingStream_; + bool const waitEarlyTermination_; + bool const waitStreamEndRun_; + }; +} // namespace edmtest + +DEFINE_FWK_MODULE(edmtest::AsyncServiceTester); +DEFINE_FWK_MODULE(edmtest::AsyncServiceWaitingTester); + +DEFINE_FWK_SERVICE(edmtest::AsyncServiceTesterService); diff --git a/FWCore/Services/test/BuildFile.xml b/FWCore/Services/test/BuildFile.xml index 9ce9e4f67f1c7..28fa612e62259 100644 --- a/FWCore/Services/test/BuildFile.xml +++ b/FWCore/Services/test/BuildFile.xml @@ -14,9 +14,18 @@ + + + + + + + + + @@ -26,6 +35,7 @@ + diff --git a/FWCore/Services/test/test_asyncservice.sh b/FWCore/Services/test/test_asyncservice.sh new file mode 100755 index 0000000000000..c9b6a04125add --- /dev/null +++ b/FWCore/Services/test/test_asyncservice.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +function die { cat log.txt; echo $1: status $2 ; exit $2; } + +CONF=${SCRAM_TEST_PATH}/test_asyncservice_cfg.py +# Normal behavior +echo "cmsRun ${CONF}" +cmsRun ${CONF} > log.txt 2>&1 || die "Failure using ${CONF}" $? + +# Framework emits early termination signal, AsyncService should disallow run() calls +echo "cmsRun ${CONF} --earlyTermination" +cmsRun ${CONF} --earlyTermination > log.txt 2>&1 +RET=$? +if [ "${RET}" == "0" ]; then + cat log.txt + die "${CONF} --earlyTermination succeeded while it was expected to fail" 1 +fi +grep -q "ZombieKillerService" log.txt && die "${CONF} --earlyTermination was killed by ZombieKillerService, while the job should have failed by itself" 1 +grep -q "AsyncCallNotAllowed" log.txt || die "${CONF} --earlyTermination did not fail with AsyncCallNotAllowed" $? +grep -q "Framework is shutting down, further run() calls are not allowed" log.txt || die "${CONF} --earlyTermination did not contain expected earlyTermination message" $? + +# Another module throws an exception while an asynchronous function is +# running, ensure the framework to keep the data processing open until +# all asynchronous functions have returned +echo "cmsRun ${CONF} --exception" +cmsRun ${CONF} --exception > log.txt 2>&1 +RET=$? +if [ "${RET}" == "0" ]; then + cat log.txt + die "${CONF} --exception succeeded while it was expected to fail" 1 +fi +grep -q "ZombieKillerService" log.txt && die "${CONF} --exception was killed by ZombieKillerService" 1 +grep -q "MoreExceptions: AfterModEndJob" log.txt && die "${CONF} --exception threw an unexpected exception in EndJob" 1 +grep -q "Intentional 'NotFound' exception for testing purposes" log.txt || die "${CONF} --exception failed in unexpected way" $? + +exit 0 diff --git a/FWCore/Services/test/test_asyncservice_cfg.py b/FWCore/Services/test/test_asyncservice_cfg.py new file mode 100644 index 0000000000000..aadd8b55caf1d --- /dev/null +++ b/FWCore/Services/test/test_asyncservice_cfg.py @@ -0,0 +1,43 @@ +import FWCore.ParameterSet.Config as cms + +import argparse +import sys + +parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test AsyncService') +parser.add_argument("--earlyTermination", help="Test behavior of EarlyTermination signal on subsequent AsyncService::run() calls", action="store_true") +parser.add_argument("--exception", help="Another module throws an exception while asynchronous function is running", action="store_true") +args = parser.parse_args() + +process = cms.Process("TEST") + +process.maxEvents.input = 8 +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 4 +process.source = cms.Source("EmptySource") + +if args.earlyTermination or args.exception: + process.tester = cms.EDProducer("edmtest::AsyncServiceWaitingTester", + throwingStream = cms.untracked.uint32(0) + ) + + # Make stream 0 always throw the exception in FailingProducer + process.streamFilter = cms.EDFilter("edmtest::StreamIDFilter", + rejectStreams = cms.vuint32(1,2,3) + ) + process.fail = cms.EDProducer("FailingProducer") + process.p2 = cms.Path(process.streamFilter+process.fail) + + testerService = cms.Service("edmtest::AsyncServiceTesterService") + if args.earlyTermination: + process.tester.waitEarlyTermination = cms.untracked.bool(True) + testerService.watchEarlyTermination = cms.bool(True) + elif args.exception: + process.tester.waitStreamEndRun = cms.untracked.bool(True) + testerService.watchStreamEndRun = cms.bool(True) + process.add_(testerService) +else: + process.tester = cms.EDProducer("edmtest::AsyncServiceTester") + +process.p = cms.Path(process.tester) + +process.add_(cms.Service("ZombieKillerService", secondsBetweenChecks=cms.untracked.uint32(5))) diff --git a/FWCore/TestModules/README.md b/FWCore/TestModules/README.md new file mode 100644 index 0000000000000..b8875b9d6b401 --- /dev/null +++ b/FWCore/TestModules/README.md @@ -0,0 +1,9 @@ +# General-purpose modules for testing purposes + +This package contains modules that are used in framework tests, but +are generic-enough to be usable outside of the framework as well. +Their interfaces are intended to be relatively stable. + +## `edmtest::StreamIDFilter` + +This module can be used to reject all events in specific streams. diff --git a/FWCore/TestModules/plugins/BuildFile.xml b/FWCore/TestModules/plugins/BuildFile.xml new file mode 100644 index 0000000000000..d59f00b4a326d --- /dev/null +++ b/FWCore/TestModules/plugins/BuildFile.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/FWCore/TestModules/plugins/StreamIDFilter.cc b/FWCore/TestModules/plugins/StreamIDFilter.cc new file mode 100644 index 0000000000000..0a32c8106c66c --- /dev/null +++ b/FWCore/TestModules/plugins/StreamIDFilter.cc @@ -0,0 +1,35 @@ +#include "FWCore/Framework/interface/global/EDFilter.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" + +namespace edmtest { + class StreamIDFilter : public edm::global::EDFilter<> { + public: + explicit StreamIDFilter(edm::ParameterSet const&); + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + bool filter(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const final; + + private: + std::vector rejectStreams_; + }; + + StreamIDFilter::StreamIDFilter(edm::ParameterSet const& ps) + : rejectStreams_(ps.getParameter>("rejectStreams")) {} + + bool StreamIDFilter::filter(edm::StreamID id, edm::Event&, edm::EventSetup const&) const { + return std::find(rejectStreams_.begin(), rejectStreams_.end(), id) == rejectStreams_.end(); + } + + void StreamIDFilter::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + desc.add>("rejectStreams") + ->setComment("Stream IDs for which to reject events. If empty, all events are accepted."); + descriptions.addDefault(desc); + } +} // namespace edmtest + +DEFINE_FWK_MODULE(edmtest::StreamIDFilter); diff --git a/FWCore/Utilities/interface/ReusableObjectHolder.h b/FWCore/Utilities/interface/ReusableObjectHolder.h index 0fdf70bb2232e..bb53e407cc46c 100644 --- a/FWCore/Utilities/interface/ReusableObjectHolder.h +++ b/FWCore/Utilities/interface/ReusableObjectHolder.h @@ -88,7 +88,7 @@ namespace edm { : m_availableQueue(std::move(iOther.m_availableQueue)), m_outstandingObjects(0) { assert(0 == iOther.m_outstandingObjects); } - ~ReusableObjectHolder() { + ~ReusableObjectHolder() noexcept { assert(0 == m_outstandingObjects); std::unique_ptr item; while (m_availableQueue.try_pop(item)) { diff --git a/HeterogeneousCore/AlpakaCore/BuildFile.xml b/HeterogeneousCore/AlpakaCore/BuildFile.xml index fc6f633654d6d..895830a9dddd0 100644 --- a/HeterogeneousCore/AlpakaCore/BuildFile.xml +++ b/HeterogeneousCore/AlpakaCore/BuildFile.xml @@ -1,4 +1,6 @@ + + diff --git a/HeterogeneousCore/AlpakaCore/interface/EventCache.h b/HeterogeneousCore/AlpakaCore/interface/EventCache.h index 2701a739222f9..f6bc3ba69b720 100644 --- a/HeterogeneousCore/AlpakaCore/interface/EventCache.h +++ b/HeterogeneousCore/AlpakaCore/interface/EventCache.h @@ -69,7 +69,11 @@ namespace cms::alpakatools { private: std::shared_ptr makeOrGet(Device dev) { - return cache_[alpaka::getNativeHandle(dev)].makeOrGet([dev]() { return std::make_unique(dev); }); + return cache_[alpaka::getNativeHandle(dev)].makeOrGet([dev]() { + // We want non-busy waits + bool constexpr busyWait = false; + return std::make_unique(dev, busyWait); + }); } // not thread safe, intended to be called only from AlpakaService diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h index 016df5d9ad59e..19687ca980eb5 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h @@ -8,7 +8,6 @@ #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" #include "HeterogeneousCore/AlpakaInterface/interface/config.h" -#include "HeterogeneousCore/AlpakaInterface/interface/HostOnlyTask.h" namespace ALPAKA_ACCELERATOR_NAMESPACE { /** diff --git a/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc b/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc index 340bb6cb7083d..b6c3c36af6041 100644 --- a/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc +++ b/HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc @@ -1,5 +1,7 @@ #include +#include "FWCore/Concurrency/interface/Async.h" +#include "FWCore/ServiceRegistry/interface/Service.h" #include "FWCore/Utilities/interface/EDMException.h" #include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h" @@ -30,11 +32,12 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { } void EDMetadata::enqueueCallback(edm::WaitingTaskWithArenaHolder holder) { - alpaka::enqueue(*queue_, alpaka::HostOnlyTask([holder = std::move(holder)](std::exception_ptr eptr) { - // The functor is required to be const, but the original waitingTaskHolder_ - // needs to be notified... - const_cast(holder).doneWaiting(eptr); - })); + edm::Service async; + recordEvent(); + async->runAsync( + std::move(holder), + [event = event_]() mutable { alpaka::wait(*event); }, + []() { return "Enqueued via " EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE) "::EDMetadata::enqueueCallback()"; }); } void EDMetadata::synchronize(EDMetadata& consumer, bool tryReuseQueue) const { diff --git a/HeterogeneousCore/CUDACore/src/ScopedContext.cc b/HeterogeneousCore/CUDACore/src/ScopedContext.cc index ccf7995a20061..68e0f869bf3d2 100644 --- a/HeterogeneousCore/CUDACore/src/ScopedContext.cc +++ b/HeterogeneousCore/CUDACore/src/ScopedContext.cc @@ -1,5 +1,6 @@ #include "HeterogeneousCore/CUDACore/interface/ScopedContext.h" +#include "FWCore/Concurrency/interface/Async.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ServiceRegistry/interface/Service.h" #include "FWCore/Utilities/interface/Exception.h" @@ -8,34 +9,6 @@ #include "chooseDevice.h" -namespace { - struct CallbackData { - edm::WaitingTaskWithArenaHolder holder; - int device; - }; - - void CUDART_CB cudaScopedContextCallback(cudaStream_t streamId, cudaError_t status, void* data) { - std::unique_ptr guard{reinterpret_cast(data)}; - edm::WaitingTaskWithArenaHolder& waitingTaskHolder = guard->holder; - int device = guard->device; - if (status == cudaSuccess) { - LogTrace("ScopedContext") << " GPU kernel finished (in callback) device " << device << " CUDA stream " - << streamId; - waitingTaskHolder.doneWaiting(nullptr); - } else { - // wrap the exception in a try-catch block to let GDB "catch throw" break on it - try { - auto error = cudaGetErrorName(status); - auto message = cudaGetErrorString(status); - throw cms::Exception("CUDAError") << "Callback of CUDA stream " << streamId << " in device " << device - << " error " << error << ": " << message; - } catch (cms::Exception&) { - waitingTaskHolder.doneWaiting(std::current_exception()); - } - } - } -} // namespace - namespace cms::cuda { namespace impl { ScopedContextBase::ScopedContextBase(edm::StreamID streamID) : currentDevice_(chooseDevice(streamID)) { @@ -83,8 +56,13 @@ namespace cms::cuda { } void ScopedContextHolderHelper::enqueueCallback(int device, cudaStream_t stream) { - cudaCheck( - cudaStreamAddCallback(stream, cudaScopedContextCallback, new CallbackData{waitingTaskHolder_, device}, 0)); + edm::Service async; + SharedEventPtr event = getEventCache().get(); + cudaCheck(cudaEventRecord(event.get(), stream)); + async->runAsync( + std::move(waitingTaskHolder_), + [event = std::move(event)]() mutable { cudaCheck(cudaEventSynchronize(event.get())); }, + []() { return "Enqueued by cms::cuda::ScopedContextHolderHelper::enqueueCallback()"; }); } } // namespace impl diff --git a/HeterogeneousCore/CUDACore/test/test_main.cc b/HeterogeneousCore/CUDACore/test/test_main.cc index 2e1027598a4de..85e19019bc738 100644 --- a/HeterogeneousCore/CUDACore/test/test_main.cc +++ b/HeterogeneousCore/CUDACore/test/test_main.cc @@ -17,6 +17,7 @@ class ServiceRegistryListener : public Catch::TestEventListenerBase { R"_(import FWCore.ParameterSet.Config as cms process = cms.Process('Test') process.CUDAService = cms.Service('CUDAService') +process.AsyncService = cms.Service('AsyncService') )_"}; std::unique_ptr params; diff --git a/HeterogeneousCore/CUDAUtilities/src/EventCache.cc b/HeterogeneousCore/CUDAUtilities/src/EventCache.cc index a80cfdd412ec5..5682c3fe2bc96 100644 --- a/HeterogeneousCore/CUDAUtilities/src/EventCache.cc +++ b/HeterogeneousCore/CUDAUtilities/src/EventCache.cc @@ -46,7 +46,9 @@ namespace cms::cuda { return cache_[dev].makeOrGet([dev]() { cudaEvent_t event; // it should be a bit faster to ignore timings - cudaCheck(cudaEventCreateWithFlags(&event, cudaEventDisableTiming)); + // cudaEventBlockingSync is needed to let the thread calling + // cudaEventSynchronize() to sleep instead of spinning the CPU + cudaCheck(cudaEventCreateWithFlags(&event, cudaEventDisableTiming | cudaEventBlockingSync)); return std::unique_ptr(event, Deleter{dev}); }); }