From e9c55beec8035bbd59c7c6a72a47aa314b540897 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Sat, 29 Aug 2020 13:26:27 -0500 Subject: [PATCH 01/10] Switch to using tbb::global_control --- FWCore/Concurrency/test/TestTBB.cc | 95 ++++++++++------------ FWCore/Utilities/test/RunningAverage_t.cpp | 7 +- 2 files changed, 49 insertions(+), 53 deletions(-) diff --git a/FWCore/Concurrency/test/TestTBB.cc b/FWCore/Concurrency/test/TestTBB.cc index 6eff8716892fb..a5f2afcbcfa2d 100644 --- a/FWCore/Concurrency/test/TestTBB.cc +++ b/FWCore/Concurrency/test/TestTBB.cc @@ -1,6 +1,6 @@ #include #include -#include "tbb/task_scheduler_init.h" +#include "tbb/global_control.h" #include "tbb/parallel_for.h" #include "tbb/blocked_range.h" @@ -8,81 +8,76 @@ using namespace tbb; using namespace std; class ArraySummer { - - int * p_array_a; - int * p_array_b; - int * p_array_sum; + int* p_array_a; + int* p_array_b; + int* p_array_sum; public: // This empty constructor with an initialization list is used to setup calls to the function - ArraySummer(int * p_a, int * p_b, int * p_sum) : p_array_a(p_a), p_array_b(p_b), p_array_sum(p_sum) { } + ArraySummer(int* p_a, int* p_b, int* p_sum) : p_array_a(p_a), p_array_b(p_b), p_array_sum(p_sum) {} - void operator() ( const blocked_range& r ) const { - for ( int i = r.begin(); i != r.end(); i++ ) { // iterates over the entire chunk + void operator()(const blocked_range& r) const { + for (int i = r.begin(); i != r.end(); i++) { // iterates over the entire chunk p_array_sum[i] = p_array_a[i] + p_array_b[i]; } } - }; -int main(int argc, char *argv[]) { - int * p_A; - int * p_B; - int * p_SUM_1T; - int * p_SUM_TBB; +int main(int argc, char* argv[]) { + int* p_A; + int* p_B; + int* p_SUM_1T; + int* p_SUM_TBB; - /* This is the TBB runtime... */ - task_scheduler_init init; + /* This is the TBB runtime... */ + global_control control(global_control::max_allowed_parallelism, 4); - constexpr int nElements = 10; + constexpr int nElements = 10; - p_A = new int[nElements]; - p_B = new int[nElements]; - p_SUM_1T = new int[nElements]; - p_SUM_TBB = new int[nElements]; + p_A = new int[nElements]; + p_B = new int[nElements]; + p_SUM_1T = new int[nElements]; + p_SUM_TBB = new int[nElements]; - /* + /* * Initialize the data sets ... could do this in parallel too, but * serial is easier to read */ - p_A[0] = p_B[0] = 0; - p_A[1] = p_B[1] = 1; - for( int i=2;i(0, nElements, 100), - ArraySummer( p_A, p_B, p_SUM_TBB ) ); + parallel_for(blocked_range(0, nElements, 100), ArraySummer(p_A, p_B, p_SUM_TBB)); - /* + /* * Verify the sums match */ - for(int i=0;i #include #include @@ -18,8 +19,8 @@ namespace { namespace test_average { namespace running_average { int test() { - //tbb::task_scheduler_init init; // Automatic number of threads - tbb::task_scheduler_init init(tbb::task_scheduler_init::default_num_threads()); // Explicit number of threads + tbb::global_control control(tbb::global_control::max_allowed_parallelism, + tbb::this_task_arena::max_concurrency()); // Explicit number of threads // std::random_device rd; std::mt19937 e2; // (rd()); From 5c29b9c3a34e7b5fc1b33eb9a0b02e8804e85147 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Mon, 31 Aug 2020 09:06:48 -0500 Subject: [PATCH 02/10] Use edm::ThreadsController to constrain threads Replace explicit call to tbb::task_scheduler_init with new edm::ThreadsController. --- .../Concurrency/interface/ThreadsController.h | 60 +++++++++++++++++++ FWCore/Concurrency/interface/setNThreads.h | 6 +- FWCore/Concurrency/src/ThreadsController.cc | 32 ++++++++++ FWCore/Concurrency/src/setNThreads.cc | 10 ++-- FWCore/Framework/bin/cmsRun.cpp | 8 +-- FWCore/Framework/bin/cmsRunPython3.cpp | 8 +-- FWCore/Framework/test/callback_t.cppunit.cc | 7 +-- .../test/dependentrecord_t.cppunit.cc | 7 +-- FWCore/Framework/test/esproducer_t.cppunit.cc | 7 +-- FWCore/Framework/test/eventsetup_t.cppunit.cc | 6 +- FWCore/Framework/test/fullchain_t.cppunit.cc | 6 +- .../Integration/test/standalone_t.cppunit.cc | 6 +- .../src/PythonEventProcessor.cc | 4 +- FWCore/TestProcessor/src/TestProcessor.cc | 6 +- 14 files changed, 127 insertions(+), 46 deletions(-) create mode 100644 FWCore/Concurrency/interface/ThreadsController.h create mode 100644 FWCore/Concurrency/src/ThreadsController.cc diff --git a/FWCore/Concurrency/interface/ThreadsController.h b/FWCore/Concurrency/interface/ThreadsController.h new file mode 100644 index 0000000000000..cfe9b7346792b --- /dev/null +++ b/FWCore/Concurrency/interface/ThreadsController.h @@ -0,0 +1,60 @@ +#ifndef FWCore_Concurrency_ThreadsController_h +#define FWCore_Concurrency_ThreadsController_h +// -*- C++ -*- +// +// Package: FWCore/Concurrency +// Class : ThreadsController +// +/**\class ThreadsController ThreadsController.h "ThreadsController.h" + + Description: Controls how many threads and how much stack memory per thread + + Usage: + The lifetime of the ThreadsController sets how long the options are in use. + +*/ +// +// Original Author: FWCore +// Created: Fri, 18 Nov 2016 20:30:42 GMT +// + +// system include files +#include +#include + +// user include files + +// forward declarations + +namespace edm { + class ThreadsController { + public: + ThreadsController() = delete; + explicit ThreadsController(size_t iNThreads) + : m_nThreads{tbb::global_control::max_allowed_parallelism, iNThreads}, + m_oversubscriber{makeOversubscriber(iNThreads)} {} + ThreadsController(size_t iNThreads, size_t iStackSize) + : m_nThreads{tbb::global_control::max_allowed_parallelism, iNThreads}, + m_oversubscriber{makeOversubscriber(iNThreads)} { + setStackSize(iStackSize); + } + + // ---------- member functions --------------------------- + void setStackSize(size_t iStackSize) { + m_stackSize = std::make_unique(tbb::global_control::thread_stack_size, iStackSize); + } + + private: + struct Destructor { + void operator()(void*) const; + }; + static std::unique_ptr makeOversubscriber(size_t iNThreads); + friend class std::unique_ptr; + // ---------- member data -------------------------------- + tbb::global_control m_nThreads; + std::unique_ptr m_stackSize; + std::unique_ptr m_oversubscriber; + }; +} // namespace edm + +#endif diff --git a/FWCore/Concurrency/interface/setNThreads.h b/FWCore/Concurrency/interface/setNThreads.h index 7ac0acfd2ecfd..dc6a616de1bc6 100644 --- a/FWCore/Concurrency/interface/setNThreads.h +++ b/FWCore/Concurrency/interface/setNThreads.h @@ -7,11 +7,9 @@ // Created by Chris Jones on 7/24/20. // #include -#include "tbb/task_scheduler_init.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" namespace edm { - unsigned int setNThreads(unsigned int iNThreads, - unsigned int iStackSize, - std::unique_ptr& oPtr); + unsigned int setNThreads(unsigned int iNThreads, unsigned int iStackSize, std::unique_ptr& oPtr); } #endif /* FWCore_Concurrency_setNThreads_h */ diff --git a/FWCore/Concurrency/src/ThreadsController.cc b/FWCore/Concurrency/src/ThreadsController.cc new file mode 100644 index 0000000000000..829c4a8e0541d --- /dev/null +++ b/FWCore/Concurrency/src/ThreadsController.cc @@ -0,0 +1,32 @@ +// -*- C++ -*- +// +// Package: FWCore/Concurrency +// Class : ThreadsController +// +// Implementation: +// [Notes on implementation] +// +// Original Author: Christopher Jones +// Created: Fri, 28 Aug 2020 19:42:30 GMT +// + +// system include files +#include "tbb/task_scheduler_init.h" + +// user include files +#include "FWCore/Concurrency/interface/ThreadsController.h" + +//NOTE: The only way at present to oversubscribe the number of TBB threads to cores is to +// use a tbb::task_scheduler_init. +namespace edm { + + void ThreadsController::Destructor::operator()(void* iThis) const { + delete static_cast(iThis); + } + + std::unique_ptr ThreadsController::makeOversubscriber(size_t iNThreads) { + return std::unique_ptr( + static_cast(new tbb::task_scheduler_init(iNThreads))); + } + +} // namespace edm diff --git a/FWCore/Concurrency/src/setNThreads.cc b/FWCore/Concurrency/src/setNThreads.cc index cfa96ab5823c6..26470bdf51a15 100644 --- a/FWCore/Concurrency/src/setNThreads.cc +++ b/FWCore/Concurrency/src/setNThreads.cc @@ -4,13 +4,11 @@ // // Created by Chris Jones on 7/24/20. // - +#include "tbb/task_arena.h" #include "FWCore/Concurrency/interface/setNThreads.h" namespace edm { - unsigned int setNThreads(unsigned int iNThreads, - unsigned int iStackSize, - std::unique_ptr& oPtr) { + unsigned int setNThreads(unsigned int iNThreads, unsigned int iStackSize, std::unique_ptr& oPtr) { //The TBB documentation doesn't explicitly say this, but when the task_scheduler_init's // destructor is run it does a 'wait all' for all tasks to finish and then shuts down all the threads. // This provides a clean synchronization point. @@ -23,9 +21,9 @@ namespace edm { oPtr.reset(); if (0 == iNThreads) { //Allow TBB to decide how many threads. This is normally the number of CPUs in the machine. - iNThreads = tbb::task_scheduler_init::default_num_threads(); + iNThreads = tbb::this_task_arena::max_concurrency(); } - oPtr = std::make_unique(static_cast(iNThreads), iStackSize); + oPtr = std::make_unique(static_cast(iNThreads), iStackSize); return iNThreads; } diff --git a/FWCore/Framework/bin/cmsRun.cpp b/FWCore/Framework/bin/cmsRun.cpp index 0fb5876fd4132..8413662172e6f 100644 --- a/FWCore/Framework/bin/cmsRun.cpp +++ b/FWCore/Framework/bin/cmsRun.cpp @@ -20,6 +20,7 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "FWCore/ServiceRegistry/interface/ServiceToken.h" #include "FWCore/ServiceRegistry/interface/ServiceWrapper.h" #include "FWCore/Concurrency/interface/setNThreads.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/EDMException.h" #include "FWCore/Utilities/interface/ConvertException.h" @@ -30,7 +31,6 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "TError.h" #include "boost/program_options.hpp" -#include "tbb/task_scheduler_init.h" #include #include @@ -102,10 +102,8 @@ int main(int argc, char* argv[]) { bool alwaysAddContext = true; //Default to only use 1 thread. We define this early (before parsing the command line options // and python configuration) since the plugin system or message logger may be using TBB. - //NOTE: with new version of TBB (44_20160316oss) we can only construct 1 tbb::task_scheduler_init per job - // else we get a crash. So for now we can't have any services use tasks in their constructors. - std::unique_ptr tsiPtr = std::make_unique( - edm::s_defaultNumberOfThreads, edm::s_defaultSizeOfStackForThreadsInKB * 1024); + auto tsiPtr = std::make_unique(edm::s_defaultNumberOfThreads, + edm::s_defaultSizeOfStackForThreadsInKB * 1024); std::shared_ptr theMessageServicePresence; std::unique_ptr jobReportStreamPtr; std::shared_ptr > jobRep; diff --git a/FWCore/Framework/bin/cmsRunPython3.cpp b/FWCore/Framework/bin/cmsRunPython3.cpp index 7bc4ed6b25e1d..e1a78a83a57b5 100644 --- a/FWCore/Framework/bin/cmsRunPython3.cpp +++ b/FWCore/Framework/bin/cmsRunPython3.cpp @@ -20,6 +20,7 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "FWCore/ServiceRegistry/interface/ServiceToken.h" #include "FWCore/ServiceRegistry/interface/ServiceWrapper.h" #include "FWCore/Concurrency/interface/setNThreads.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/EDMException.h" #include "FWCore/Utilities/interface/ConvertException.h" @@ -30,7 +31,6 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "FWCore/Utilities/interface/thread_safety_macros.h" #include "boost/program_options.hpp" -#include "tbb/task_scheduler_init.h" #include #include @@ -102,10 +102,8 @@ int main(int argc, char* argv[]) { bool alwaysAddContext = true; //Default to only use 1 thread. We define this early (before parsing the command line options // and python configuration) since the plugin system or message logger may be using TBB. - //NOTE: with new version of TBB (44_20160316oss) we can only construct 1 tbb::task_scheduler_init per job - // else we get a crash. So for now we can't have any services use tasks in their constructors. - std::unique_ptr tsiPtr = std::make_unique( - edm::s_defaultNumberOfThreads, edm::s_defaultSizeOfStackForThreadsInKB * 1024); + auto tsiPtr = std::make_unique(edm::s_defaultNumberOfThreads, + edm::s_defaultSizeOfStackForThreadsInKB * 1024); std::shared_ptr theMessageServicePresence; std::unique_ptr jobReportStreamPtr; std::shared_ptr > jobRep; diff --git a/FWCore/Framework/test/callback_t.cppunit.cc b/FWCore/Framework/test/callback_t.cppunit.cc index ca9a98c69ca16..1625dfe729ade 100644 --- a/FWCore/Framework/test/callback_t.cppunit.cc +++ b/FWCore/Framework/test/callback_t.cppunit.cc @@ -10,8 +10,7 @@ #include "FWCore/Utilities/interface/do_nothing_deleter.h" #include "FWCore/Framework/interface/Callback.h" #include "FWCore/Framework/interface/ESProducts.h" - -#include "tbb/task_scheduler_init.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" #include #include @@ -121,7 +120,7 @@ class testCallback : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE_END(); public: - void setUp() { m_scheduler = std::make_unique(1); } + void setUp() { m_scheduler = std::make_unique(1); } void tearDown() {} void uniquePtrTest(); @@ -129,7 +128,7 @@ class testCallback : public CppUnit::TestFixture { void ptrProductsTest(); private: - edm::propagate_const> m_scheduler; + edm::propagate_const> m_scheduler; }; ///registration of the test so that the runner can find it diff --git a/FWCore/Framework/test/dependentrecord_t.cppunit.cc b/FWCore/Framework/test/dependentrecord_t.cppunit.cc index 987cf5045f9fe..f3b13d9248a5c 100644 --- a/FWCore/Framework/test/dependentrecord_t.cppunit.cc +++ b/FWCore/Framework/test/dependentrecord_t.cppunit.cc @@ -30,11 +30,10 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Utilities/interface/propagate_const.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" #include "cppunit/extensions/HelperMacros.h" -#include "tbb/task_scheduler_init.h" - #include #include #include @@ -77,7 +76,7 @@ class testdependentrecord : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE_END(); public: - void setUp() { m_scheduler = std::make_unique(1); } + void setUp() { m_scheduler = std::make_unique(1); } void tearDown() {} void dependentConstructorTest(); @@ -100,7 +99,7 @@ class testdependentrecord : public CppUnit::TestFixture { void extendIOVTest(); private: - edm::propagate_const> m_scheduler; + edm::propagate_const> m_scheduler; }; //Cppunit class declaration over diff --git a/FWCore/Framework/test/esproducer_t.cppunit.cc b/FWCore/Framework/test/esproducer_t.cppunit.cc index d622268bbaa2d..5e976c12816db 100644 --- a/FWCore/Framework/test/esproducer_t.cppunit.cc +++ b/FWCore/Framework/test/esproducer_t.cppunit.cc @@ -26,8 +26,7 @@ #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Utilities/interface/do_nothing_deleter.h" #include "FWCore/Utilities/interface/Exception.h" - -#include "tbb/task_scheduler_init.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" #include #include @@ -67,7 +66,7 @@ class testEsproducer : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE_END(); public: - void setUp() { m_scheduler = std::make_unique(1); } + void setUp() { m_scheduler = std::make_unique(1); } void tearDown() {} void registerTest(); @@ -83,7 +82,7 @@ class testEsproducer : public CppUnit::TestFixture { void dataProxyProviderTest(); private: - edm::propagate_const> m_scheduler; + edm::propagate_const> m_scheduler; class Test1Producer : public ESProducer { public: diff --git a/FWCore/Framework/test/eventsetup_t.cppunit.cc b/FWCore/Framework/test/eventsetup_t.cppunit.cc index a9c3a01ee9dad..d8ffb33a77bc6 100644 --- a/FWCore/Framework/test/eventsetup_t.cppunit.cc +++ b/FWCore/Framework/test/eventsetup_t.cppunit.cc @@ -42,9 +42,9 @@ #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/ESProductTag.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" #include "cppunit/extensions/HelperMacros.h" -#include "tbb/task_scheduler_init.h" #include #include @@ -104,7 +104,7 @@ class testEventsetup : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE_END(); public: - void setUp() { m_scheduler = std::make_unique(1); } + void setUp() { m_scheduler = std::make_unique(1); } void tearDown() {} void constructTest(); @@ -136,7 +136,7 @@ class testEventsetup : public CppUnit::TestFixture { void resetProxiesTest(); private: - edm::propagate_const> m_scheduler; + edm::propagate_const> m_scheduler; DummyData kGood{1}; DummyData kBad{0}; diff --git a/FWCore/Framework/test/fullchain_t.cppunit.cc b/FWCore/Framework/test/fullchain_t.cppunit.cc index d56610de43abc..b05f877c15c25 100644 --- a/FWCore/Framework/test/fullchain_t.cppunit.cc +++ b/FWCore/Framework/test/fullchain_t.cppunit.cc @@ -21,9 +21,9 @@ #include "FWCore/Framework/src/EventSetupsController.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" #include "cppunit/extensions/HelperMacros.h" -#include "tbb/task_scheduler_init.h" #include #include @@ -54,13 +54,13 @@ class testfullChain : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE_END(); public: - void setUp() { m_scheduler = std::make_unique(1); } + void setUp() { m_scheduler = std::make_unique(1); } void tearDown() {} void getfromDataproxyproviderTest(); private: - edm::propagate_const> m_scheduler; + edm::propagate_const> m_scheduler; }; ///registration of the test so that the runner can find it diff --git a/FWCore/Integration/test/standalone_t.cppunit.cc b/FWCore/Integration/test/standalone_t.cppunit.cc index c4c726a1a7e8b..c5a9b843fc7e0 100644 --- a/FWCore/Integration/test/standalone_t.cppunit.cc +++ b/FWCore/Integration/test/standalone_t.cppunit.cc @@ -14,11 +14,11 @@ if the MessageLogger is not runnning. #include "FWCore/Framework/interface/EventProcessor.h" #include "FWCore/ParameterSetReader/interface/ParameterSetReader.h" #include "FWCore/Utilities/interface/propagate_const.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" // #include "FWCore/Utilities/interface/Presence.h" // #include "FWCore/PluginManager/interface/PresenceFactory.h" #include -#include "tbb/task_scheduler_init.h" #include #include @@ -31,7 +31,7 @@ class testStandalone : public CppUnit::TestFixture { public: void setUp() { m_handler = std::make_unique(); - m_scheduler = std::make_unique(1); + m_scheduler = std::make_unique(1); } void tearDown() { @@ -42,7 +42,7 @@ class testStandalone : public CppUnit::TestFixture { private: edm::propagate_const> m_handler; - edm::propagate_const> m_scheduler; + edm::propagate_const> m_scheduler; }; ///registration of the test so that the runner can find it diff --git a/FWCore/PythonFramework/src/PythonEventProcessor.cc b/FWCore/PythonFramework/src/PythonEventProcessor.cc index 016ffdea19103..0e80bcc677d3c 100644 --- a/FWCore/PythonFramework/src/PythonEventProcessor.cc +++ b/FWCore/PythonFramework/src/PythonEventProcessor.cc @@ -43,8 +43,8 @@ namespace { return iDesc; } - //TBB only allows 1 task_scheduler_init active on a thread. - CMS_THREAD_SAFE std::unique_ptr tsiPtr; + //Only one ThreadsController can be active at a time + CMS_THREAD_SAFE std::unique_ptr tsiPtr; std::shared_ptr setupThreading(std::shared_ptr iDesc) { // check the "options" ParameterSet diff --git a/FWCore/TestProcessor/src/TestProcessor.cc b/FWCore/TestProcessor/src/TestProcessor.cc index 60e491415285c..a99b55fc79ef1 100644 --- a/FWCore/TestProcessor/src/TestProcessor.cc +++ b/FWCore/TestProcessor/src/TestProcessor.cc @@ -42,9 +42,9 @@ #include "FWCore/Utilities/interface/ExceptionCollector.h" -#include "DataFormats/Provenance/interface/ParentageRegistry.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" -#include "tbb/task_scheduler_init.h" +#include "DataFormats/Provenance/interface/ParentageRegistry.h" #define xstr(s) str(s) #define str(s) #s @@ -64,7 +64,7 @@ namespace edm { bool oneTimeInitializationImpl() { edmplugin::PluginManager::configure(edmplugin::standard::config()); - static std::unique_ptr tsiPtr = std::make_unique(1); + static std::unique_ptr tsiPtr = std::make_unique(1); // register the empty parentage vector , once and for all ParentageRegistry::instance()->insertMapped(Parentage()); From 654bd25fbac7ab7f04babd1f0e0c847e8a47027a Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Mon, 31 Aug 2020 09:33:48 -0500 Subject: [PATCH 03/10] Use tbb::global_control to determine if using only 1 thread Also make sure waiting task is spawned back into the same task_arena active during the original call. --- FWCore/Framework/src/Worker.cc | 63 ++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/FWCore/Framework/src/Worker.cc b/FWCore/Framework/src/Worker.cc index 470b51e193d95..eddd1e5921030 100644 --- a/FWCore/Framework/src/Worker.cc +++ b/FWCore/Framework/src/Worker.cc @@ -10,6 +10,7 @@ #include "FWCore/Concurrency/interface/WaitingTaskHolder.h" #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" #include "FWCore/Framework/src/esTaskArenas.h" +#include "tbb/global_control.h" namespace edm { namespace { @@ -295,36 +296,40 @@ namespace edm { // default tbb arena. It will not process any tasks on the es arena. We need to add a // task that will synchronously do a wait_for_all in the es arena to be sure prefetching // will work. - if UNLIKELY (tbb::this_task_arena::max_concurrency() == 1) { + + if UNLIKELY (tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) == 1) { //We spawn this first so that the other ES tasks are before it in the TBB queue - tbb::task::spawn(*make_functor_task( - tbb::task::allocate_root(), [this, task = edm::WaitingTaskHolder(iTask), iTrans, &iImpl, iToken]() mutable { - auto waitTask = edm::make_empty_waiting_task(); - waitTask->set_ref_count(2); - auto waitTaskPtr = waitTask.get(); - esTaskArena().execute([waitTaskPtr, this, iTrans, &iImpl, iToken]() { - auto const& recs = esRecordsToGetFrom(iTrans); - auto const& items = esItemsToGetFrom(iTrans); - waitTaskPtr->set_ref_count(2); - for (size_t i = 0; i != items.size(); ++i) { - if (recs[i] != ESRecordIndex{}) { - auto rec = iImpl.findImpl(recs[i]); - if (rec) { - rec->prefetchAsync(waitTaskPtr, items[i], &iImpl, iToken); - } - } - } - waitTaskPtr->decrement_ref_count(); - waitTaskPtr->wait_for_all(); - }); - - auto exPtr = waitTask->exceptionPtr(); - if (exPtr) { - task.doneWaiting(*exPtr); - } else { - task.doneWaiting(std::exception_ptr{}); - } - })); + tbb::task_arena edArena(tbb::task_arena::attach{}); + tbb::task::spawn( + *make_functor_task(tbb::task::allocate_root(), + [this, task = edm::WaitingTaskHolder(iTask), iTrans, &iImpl, iToken, edArena]() mutable { + esTaskArena().execute([this, iTrans, &iImpl, iToken, task = std::move(task), edArena]() { + auto waitTask = edm::make_empty_waiting_task(); + auto const& recs = esRecordsToGetFrom(iTrans); + auto const& items = esItemsToGetFrom(iTrans); + waitTask->set_ref_count(2); + for (size_t i = 0; i != items.size(); ++i) { + if (recs[i] != ESRecordIndex{}) { + auto rec = iImpl.findImpl(recs[i]); + if (rec) { + rec->prefetchAsync(waitTask.get(), items[i], &iImpl, iToken); + } + } + } + waitTask->decrement_ref_count(); + waitTask->wait_for_all(); + + auto exPtr = waitTask->exceptionPtr(); + tbb::task_arena(edArena).execute([task, exPtr]() { + auto t = task; + if (exPtr) { + t.doneWaiting(*exPtr); + } else { + t.doneWaiting(std::exception_ptr{}); + } + }); + }); + })); } else { //We need iTask to run in the default arena since it is not an ES task auto task = From e29b7970ac0512c8d601b492a99c03c9bfda9bf6 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Tue, 1 Sep 2020 09:06:33 -0500 Subject: [PATCH 04/10] Use explicit tbb::task_arena to control number of threads Using tbb::task_arena together with tbb::global_control allows for oversubscribing the number of threads without having to use the deprecated tbb::task_scheduler_init. --- .../Concurrency/interface/ThreadsController.h | 27 +-- FWCore/Concurrency/src/ThreadsController.cc | 13 +- FWCore/Concurrency/src/setNThreads.cc | 6 - FWCore/Framework/bin/cmsRun.cpp | 46 +++-- FWCore/Framework/bin/cmsRunPython3.cpp | 45 +++-- .../test/stubs/TestNThreadsChecker.cc | 4 +- .../src/PythonEventProcessor.cc | 4 +- .../TestProcessor/interface/TestProcessor.h | 52 +++-- FWCore/TestProcessor/src/TestProcessor.cc | 183 ++++++++++-------- 9 files changed, 198 insertions(+), 182 deletions(-) diff --git a/FWCore/Concurrency/interface/ThreadsController.h b/FWCore/Concurrency/interface/ThreadsController.h index cfe9b7346792b..4d4a7f875e8d5 100644 --- a/FWCore/Concurrency/interface/ThreadsController.h +++ b/FWCore/Concurrency/interface/ThreadsController.h @@ -19,7 +19,8 @@ // // system include files -#include +#include "tbb/global_control.h" +#include "tbb/task_arena.h" #include // user include files @@ -30,30 +31,20 @@ namespace edm { class ThreadsController { public: ThreadsController() = delete; - explicit ThreadsController(size_t iNThreads) - : m_nThreads{tbb::global_control::max_allowed_parallelism, iNThreads}, - m_oversubscriber{makeOversubscriber(iNThreads)} {} - ThreadsController(size_t iNThreads, size_t iStackSize) - : m_nThreads{tbb::global_control::max_allowed_parallelism, iNThreads}, - m_oversubscriber{makeOversubscriber(iNThreads)} { - setStackSize(iStackSize); - } + explicit ThreadsController(unsigned int iNThreads) + : m_nThreads{tbb::global_control::max_allowed_parallelism, iNThreads}, m_stackSize{} {} + ThreadsController(unsigned int iNThreads, size_t iStackSize) + : m_nThreads{tbb::global_control::max_allowed_parallelism, iNThreads}, m_stackSize{makeStackSize(iStackSize)} {} // ---------- member functions --------------------------- - void setStackSize(size_t iStackSize) { - m_stackSize = std::make_unique(tbb::global_control::thread_stack_size, iStackSize); - } + void setStackSize(size_t iStackSize) { m_stackSize = makeStackSize(iStackSize); } private: - struct Destructor { - void operator()(void*) const; - }; - static std::unique_ptr makeOversubscriber(size_t iNThreads); - friend class std::unique_ptr; + static std::unique_ptr makeStackSize(size_t iStackSize); + // ---------- member data -------------------------------- tbb::global_control m_nThreads; std::unique_ptr m_stackSize; - std::unique_ptr m_oversubscriber; }; } // namespace edm diff --git a/FWCore/Concurrency/src/ThreadsController.cc b/FWCore/Concurrency/src/ThreadsController.cc index 829c4a8e0541d..45658a49fd324 100644 --- a/FWCore/Concurrency/src/ThreadsController.cc +++ b/FWCore/Concurrency/src/ThreadsController.cc @@ -11,22 +11,13 @@ // // system include files -#include "tbb/task_scheduler_init.h" // user include files #include "FWCore/Concurrency/interface/ThreadsController.h" -//NOTE: The only way at present to oversubscribe the number of TBB threads to cores is to -// use a tbb::task_scheduler_init. namespace edm { - - void ThreadsController::Destructor::operator()(void* iThis) const { - delete static_cast(iThis); - } - - std::unique_ptr ThreadsController::makeOversubscriber(size_t iNThreads) { - return std::unique_ptr( - static_cast(new tbb::task_scheduler_init(iNThreads))); + std::unique_ptr ThreadsController::makeStackSize(size_t iStackSize) { + return std::make_unique(tbb::global_control::thread_stack_size, iStackSize); } } // namespace edm diff --git a/FWCore/Concurrency/src/setNThreads.cc b/FWCore/Concurrency/src/setNThreads.cc index 26470bdf51a15..06de4646870f0 100644 --- a/FWCore/Concurrency/src/setNThreads.cc +++ b/FWCore/Concurrency/src/setNThreads.cc @@ -9,12 +9,6 @@ namespace edm { unsigned int setNThreads(unsigned int iNThreads, unsigned int iStackSize, std::unique_ptr& oPtr) { - //The TBB documentation doesn't explicitly say this, but when the task_scheduler_init's - // destructor is run it does a 'wait all' for all tasks to finish and then shuts down all the threads. - // This provides a clean synchronization point. - //We have to destroy the old scheduler before starting a new one in order to - // get tbb to actually switch the number of threads. If we do not, tbb stays at 1 threads - //stack size is given in KB but passed in as bytes iStackSize *= 1024; diff --git a/FWCore/Framework/bin/cmsRun.cpp b/FWCore/Framework/bin/cmsRun.cpp index 8413662172e6f..72a0dd357dcb2 100644 --- a/FWCore/Framework/bin/cmsRun.cpp +++ b/FWCore/Framework/bin/cmsRun.cpp @@ -31,6 +31,7 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "TError.h" #include "boost/program_options.hpp" +#include "tbb/task_arena.h" #include #include @@ -263,6 +264,7 @@ int main(int argc, char* argv[]) { // // Finally, reflect the values being used in the "options" top level ParameterSet. context = "Setting up number of threads"; + unsigned int nThreads = 0; { // check the "options" ParameterSet std::shared_ptr pset = processDesc->getProcessPSet(); @@ -281,6 +283,7 @@ int main(int argc, char* argv[]) { threadsInfo.stackSize_ != edm::s_defaultSizeOfStackForThreadsInKB) { threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr); } + nThreads = threadsInfo.nThreads_; // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet setThreadOptions(threadsInfo, *pset); @@ -299,27 +302,30 @@ int main(int argc, char* argv[]) { edm::MessageDrop::instance()->jobMode = jobMode; } - context = "Constructing the EventProcessor"; - EventProcessorWithSentry procTmp( - std::make_unique(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides)); - proc = std::move(procTmp); - - alwaysAddContext = false; - context = "Calling beginJob"; - proc->beginJob(); - - alwaysAddContext = false; - context = - "Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)"; - proc.on(); - auto status = proc->runToCompletion(); - if (status == edm::EventProcessor::epSignal) { - returnCode = edm::errors::CaughtSignal; - } - proc.off(); + tbb::task_arena arena(nThreads); + arena.execute([&]() { + context = "Constructing the EventProcessor"; + EventProcessorWithSentry procTmp( + std::make_unique(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides)); + proc = std::move(procTmp); + + alwaysAddContext = false; + context = "Calling beginJob"; + proc->beginJob(); + + alwaysAddContext = false; + context = + "Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)"; + proc.on(); + auto status = proc->runToCompletion(); + if (status == edm::EventProcessor::epSignal) { + returnCode = edm::errors::CaughtSignal; + } + proc.off(); - context = "Calling endJob"; - proc->endJob(); + context = "Calling endJob"; + proc->endJob(); + }); return returnCode; }); } diff --git a/FWCore/Framework/bin/cmsRunPython3.cpp b/FWCore/Framework/bin/cmsRunPython3.cpp index e1a78a83a57b5..79a94ffc44a8a 100644 --- a/FWCore/Framework/bin/cmsRunPython3.cpp +++ b/FWCore/Framework/bin/cmsRunPython3.cpp @@ -263,6 +263,7 @@ int main(int argc, char* argv[]) { // // Finally, reflect the values being used in the "options" top level ParameterSet. context = "Setting up number of threads"; + unsigned int nThreads = 0; { // check the "options" ParameterSet std::shared_ptr pset = processDesc->getProcessPSet(); @@ -281,6 +282,7 @@ int main(int argc, char* argv[]) { threadsInfo.stackSize_ != edm::s_defaultSizeOfStackForThreadsInKB) { threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr); } + nThreads = threadsInfo.nThreads_; // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet setThreadOptions(threadsInfo, *pset); @@ -299,27 +301,30 @@ int main(int argc, char* argv[]) { edm::MessageDrop::instance()->jobMode = jobMode; } - context = "Constructing the EventProcessor"; - EventProcessorWithSentry procTmp( - std::make_unique(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides)); - proc = std::move(procTmp); - - alwaysAddContext = false; - context = "Calling beginJob"; - proc->beginJob(); - - alwaysAddContext = false; - context = - "Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)"; - proc.on(); - auto status = proc->runToCompletion(); - if (status == edm::EventProcessor::epSignal) { - returnCode = edm::errors::CaughtSignal; - } - proc.off(); + tbb::task_arena arena(nThreads); + arena.execute([&]() { + context = "Constructing the EventProcessor"; + EventProcessorWithSentry procTmp( + std::make_unique(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides)); + proc = std::move(procTmp); + + alwaysAddContext = false; + context = "Calling beginJob"; + proc->beginJob(); + + alwaysAddContext = false; + context = + "Calling EventProcessor::runToCompletion (which does almost everything after beginJob and before endJob)"; + proc.on(); + auto status = proc->runToCompletion(); + if (status == edm::EventProcessor::epSignal) { + returnCode = edm::errors::CaughtSignal; + } + proc.off(); - context = "Calling endJob"; - proc->endJob(); + context = "Calling endJob"; + proc->endJob(); + }); return returnCode; }); } diff --git a/FWCore/Framework/test/stubs/TestNThreadsChecker.cc b/FWCore/Framework/test/stubs/TestNThreadsChecker.cc index 2c868f5395b33..8b6d8773d1fb1 100644 --- a/FWCore/Framework/test/stubs/TestNThreadsChecker.cc +++ b/FWCore/Framework/test/stubs/TestNThreadsChecker.cc @@ -20,7 +20,7 @@ #include #include #include -#include "tbb/task_scheduler_init.h" +#include "tbb/task_arena.h" // user include files #include "FWCore/ParameterSet/interface/ParameterSet.h" @@ -58,7 +58,7 @@ TestNThreadsChecker::TestNThreadsChecker(const edm::ParameterSet& iConfig, edm:: : m_nExpectedThreads(iConfig.getUntrackedParameter("nExpectedThreads")) { unsigned int expectedThreads = m_nExpectedThreads; if (expectedThreads == 0) { - expectedThreads = tbb::task_scheduler_init::default_num_threads(); + expectedThreads = tbb::this_task_arena::max_concurrency(); } //now do what ever initialization is needed diff --git a/FWCore/PythonFramework/src/PythonEventProcessor.cc b/FWCore/PythonFramework/src/PythonEventProcessor.cc index 0e80bcc677d3c..bf0448155a4ef 100644 --- a/FWCore/PythonFramework/src/PythonEventProcessor.cc +++ b/FWCore/PythonFramework/src/PythonEventProcessor.cc @@ -12,6 +12,7 @@ // system include files #include +#include "tbb/task_arena.h" // user include files #include "FWCore/PythonFramework/interface/PythonEventProcessor.h" @@ -45,6 +46,7 @@ namespace { //Only one ThreadsController can be active at a time CMS_THREAD_SAFE std::unique_ptr tsiPtr; + CMS_THREAD_SAFE int nThreads; std::shared_ptr setupThreading(std::shared_ptr iDesc) { // check the "options" ParameterSet @@ -94,7 +96,7 @@ PythonEventProcessor::~PythonEventProcessor() { void PythonEventProcessor::run() { auto gil = PyEval_SaveThread(); try { - (void)processor_.runToCompletion(); + tbb::task_arena{nThreads}.execute([this]() { (void)processor_.runToCompletion(); }); } catch (...) { } PyEval_RestoreThread(gil); diff --git a/FWCore/TestProcessor/interface/TestProcessor.h b/FWCore/TestProcessor/interface/TestProcessor.h index ee5f17fc2dfe1..d642609cdbfc0 100644 --- a/FWCore/TestProcessor/interface/TestProcessor.h +++ b/FWCore/TestProcessor/interface/TestProcessor.h @@ -22,6 +22,8 @@ #include #include #include +#include "tbb/global_control.h" +#include "tbb/task_arena.h" // user include files #include "FWCore/Framework/interface/SharedResourcesAcquirer.h" @@ -199,35 +201,43 @@ namespace edm { This simulates a problem happening early in the job which causes processing not to proceed. */ void testBeginAndEndJobOnly() { - beginJob(); - endJob(); + arena_.execute([this]() { + beginJob(); + endJob(); + }); } void testWithNoRuns() { - beginJob(); - beginProcessBlock(); - endProcessBlock(); - endJob(); + arena_.execute([this]() { + beginJob(); + beginProcessBlock(); + endProcessBlock(); + endJob(); + }); } void testRunWithNoLuminosityBlocks() { - beginJob(); - beginProcessBlock(); - beginRun(); - endRun(); - endProcessBlock(); - endJob(); + arena_.execute([this]() { + beginJob(); + beginProcessBlock(); + beginRun(); + endRun(); + endProcessBlock(); + endJob(); + }); } void testLuminosityBlockWithNoEvents() { - beginJob(); - beginProcessBlock(); - beginRun(); - beginLuminosityBlock(); - endLuminosityBlock(); - endRun(); - endProcessBlock(); - endJob(); + arena_.execute([this]() { + beginJob(); + beginProcessBlock(); + beginRun(); + beginLuminosityBlock(); + endLuminosityBlock(); + endRun(); + endProcessBlock(); + endJob(); + }); } void setRunNumber(edm::RunNumber_t); void setLuminosityBlockNumber(edm::LuminosityBlockNumber_t); @@ -313,6 +323,8 @@ This simulates a problem happening early in the job which causes processing not void endJob(); // ---------- member data -------------------------------- + tbb::global_control globalControl_; + tbb::task_arena arena_; std::string labelOfTestModule_; std::shared_ptr actReg_; // We do not use propagate_const because the registry itself is mutable. std::shared_ptr preg_; diff --git a/FWCore/TestProcessor/src/TestProcessor.cc b/FWCore/TestProcessor/src/TestProcessor.cc index a99b55fc79ef1..d6139b065e2a4 100644 --- a/FWCore/TestProcessor/src/TestProcessor.cc +++ b/FWCore/TestProcessor/src/TestProcessor.cc @@ -6,7 +6,7 @@ // Implementation: // [Notes on implementation] // -// Original Author: root +// Original Author: Chris Jones // Created: Mon, 30 Apr 2018 18:51:08 GMT // @@ -84,7 +84,9 @@ namespace edm { // constructors and destructor // TestProcessor::TestProcessor(Config const& iConfig, ServiceToken iToken) - : espController_(std::make_unique()), + : globalControl_(tbb::global_control::max_allowed_parallelism, 1), + arena_(1), + espController_(std::make_unique()), historyAppender_(std::make_unique()), moduleRegistry_(std::make_shared()) { //Setup various singletons @@ -214,10 +216,12 @@ namespace edm { } edm::test::Event TestProcessor::testImpl() { - setupProcessing(); - event(); + bool result = arena_.execute([this]() { + setupProcessing(); + event(); - bool result = schedule_->totalEventsPassed() > 0; + return schedule_->totalEventsPassed() > 0; + }); schedule_->clearCounters(); if (esHelper_) { //We want each test to have its own ES data products @@ -228,21 +232,23 @@ namespace edm { } edm::test::LuminosityBlock TestProcessor::testBeginLuminosityBlockImpl(edm::LuminosityBlockNumber_t iNum) { - if (not beginJobCalled_) { - beginJob(); - } - if (not beginProcessBlockCalled_) { - beginProcessBlock(); - } - if (not beginRunCalled_) { - beginRun(); - } - if (beginLumiCalled_) { - endLuminosityBlock(); - assert(lumiNumber_ != iNum); - } - lumiNumber_ = iNum; - beginLuminosityBlock(); + arena_.execute([this, iNum]() { + if (not beginJobCalled_) { + beginJob(); + } + if (not beginProcessBlockCalled_) { + beginProcessBlock(); + } + if (not beginRunCalled_) { + beginRun(); + } + if (beginLumiCalled_) { + endLuminosityBlock(); + assert(lumiNumber_ != iNum); + } + lumiNumber_ = iNum; + beginLuminosityBlock(); + }); if (esHelper_) { //We want each test to have its own ES data products @@ -253,20 +259,21 @@ namespace edm { } edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() { - if (not beginJobCalled_) { - beginJob(); - } - if (not beginProcessBlockCalled_) { - beginProcessBlock(); - } - if (not beginRunCalled_) { - beginRun(); - } - if (not beginLumiCalled_) { - beginLuminosityBlock(); - } - auto lumi = endLuminosityBlock(); - + auto lumi = arena_.execute([this]() { + if (not beginJobCalled_) { + beginJob(); + } + if (not beginProcessBlockCalled_) { + beginProcessBlock(); + } + if (not beginRunCalled_) { + beginRun(); + } + if (not beginLumiCalled_) { + beginLuminosityBlock(); + } + return endLuminosityBlock(); + }); if (esHelper_) { //We want each test to have its own ES data products esHelper_->resetAllProxies(); @@ -276,19 +283,20 @@ namespace edm { } edm::test::Run TestProcessor::testBeginRunImpl(edm::RunNumber_t iNum) { - if (not beginJobCalled_) { - beginJob(); - } - if (not beginProcessBlockCalled_) { - beginProcessBlock(); - } - if (beginRunCalled_) { - assert(runNumber_ != iNum); - endRun(); - } - runNumber_ = iNum; - beginRun(); - + arena_.execute([this, iNum]() { + if (not beginJobCalled_) { + beginJob(); + } + if (not beginProcessBlockCalled_) { + beginProcessBlock(); + } + if (beginRunCalled_) { + assert(runNumber_ != iNum); + endRun(); + } + runNumber_ = iNum; + beginRun(); + }); if (esHelper_) { //We want each test to have its own ES data products esHelper_->resetAllProxies(); @@ -298,17 +306,18 @@ namespace edm { principalCache_.runPrincipalPtr(), labelOfTestModule_, processConfiguration_->processName()); } edm::test::Run TestProcessor::testEndRunImpl() { - if (not beginJobCalled_) { - beginJob(); - } - if (not beginProcessBlockCalled_) { - beginProcessBlock(); - } - if (not beginRunCalled_) { - beginRun(); - } - auto rp = endRun(); - + auto rp = arena_.execute([this]() { + if (not beginJobCalled_) { + beginJob(); + } + if (not beginProcessBlockCalled_) { + beginProcessBlock(); + } + if (not beginRunCalled_) { + beginRun(); + } + return endRun(); + }); if (esHelper_) { //We want each test to have its own ES data products esHelper_->resetAllProxies(); @@ -318,21 +327,25 @@ namespace edm { } edm::test::ProcessBlock TestProcessor::testBeginProcessBlockImpl() { - if (not beginJobCalled_) { - beginJob(); - } - beginProcessBlock(); + arena_.execute([this]() { + if (not beginJobCalled_) { + beginJob(); + } + beginProcessBlock(); + }); return edm::test::ProcessBlock( &principalCache_.processBlockPrincipal(), labelOfTestModule_, processConfiguration_->processName()); } edm::test::ProcessBlock TestProcessor::testEndProcessBlockImpl() { - if (not beginJobCalled_) { - beginJob(); - } - if (not beginProcessBlockCalled_) { - beginProcessBlock(); - } - auto pbp = endProcessBlock(); + auto pbp = arena_.execute([this]() { + if (not beginJobCalled_) { + beginJob(); + } + if (not beginProcessBlockCalled_) { + beginProcessBlock(); + } + return endProcessBlock(); + }); return edm::test::ProcessBlock(pbp, labelOfTestModule_, processConfiguration_->processName()); } @@ -352,21 +365,23 @@ namespace edm { } void TestProcessor::teardownProcessing() { - if (beginLumiCalled_) { - endLuminosityBlock(); - beginLumiCalled_ = false; - } - if (beginRunCalled_) { - endRun(); - beginRunCalled_ = false; - } - if (beginProcessBlockCalled_) { - endProcessBlock(); - beginProcessBlockCalled_ = false; - } - if (beginJobCalled_) { - endJob(); - } + arena_.execute([this]() { + if (beginLumiCalled_) { + endLuminosityBlock(); + beginLumiCalled_ = false; + } + if (beginRunCalled_) { + endRun(); + beginRunCalled_ = false; + } + if (beginProcessBlockCalled_) { + endProcessBlock(); + beginProcessBlockCalled_ = false; + } + if (beginJobCalled_) { + endJob(); + } + }); } void TestProcessor::beginJob() { From b540c6b2c560059a3f48142f62e8b566bdf6ce8b Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Tue, 1 Sep 2020 09:18:07 -0500 Subject: [PATCH 05/10] Removed unused EventProcessor calls --- CondFormats/CSCObjects/test/testCSCMapping.cpp | 16 ---------------- .../CSCObjects/test/testCSCTriggerMapping.cpp | 16 ---------------- 2 files changed, 32 deletions(-) diff --git a/CondFormats/CSCObjects/test/testCSCMapping.cpp b/CondFormats/CSCObjects/test/testCSCMapping.cpp index e2e9a422f4c84..61d3b58660219 100644 --- a/CondFormats/CSCObjects/test/testCSCMapping.cpp +++ b/CondFormats/CSCObjects/test/testCSCMapping.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include #include "FWCore/ParameterSetReader/interface/ParameterSetReader.h" @@ -52,27 +51,12 @@ class testCSCMapping : public CppUnit::TestFixture { void testRead(); - int runIt(const std::string& config); - private: const std::string myName_; const int dashedLineWidth; std::string dashedLine; }; -int testCSCMapping::runIt(const std::string& config) { - edm::AssertHandler ah; - int rc = 0; - try { - edm::EventProcessor proc(edm::getPSetFromConfig(config)); - proc.run(); - } catch (cms::Exception& e) { - std::cerr << "Exception caught: " << e.what() << std::endl; - rc = 1; - } - return rc; -} - void testCSCMapping::testRead() { edm::FileInPath fip("CondFormats/CSCObjects/data/csc_slice_test_map.txt"); std::cout << "Attempt to set FileInPath to " << fip.fullPath() << std::endl; diff --git a/CondFormats/CSCObjects/test/testCSCTriggerMapping.cpp b/CondFormats/CSCObjects/test/testCSCTriggerMapping.cpp index ce0767f33ee27..f3500751459da 100644 --- a/CondFormats/CSCObjects/test/testCSCTriggerMapping.cpp +++ b/CondFormats/CSCObjects/test/testCSCTriggerMapping.cpp @@ -6,7 +6,6 @@ #include #include -#include #include "FWCore/PluginManager/interface/ProblemTracker.h" #include #include @@ -42,27 +41,12 @@ class testCSCTriggerMapping : public CppUnit::TestFixture { void testRead(); - int runIt(const std::string& config); - private: const std::string myName_; const int dashedLineWidth; std::string dashedLine; }; -int testCSCTriggerMapping::runIt(const std::string& config) { - edm::AssertHandler ah; - int rc = 0; - try { - edm::EventProcessor proc(edm::getPSetFromConfig(config)); - proc.run(); - } catch (cms::Exception& e) { - std::cerr << "Exception caught: " << e.what() << std::endl; - rc = 1; - } - return rc; -} - void testCSCTriggerMapping::testRead() { std::cout << myName_ << ": --- t e s t C S C T r i g g e r M a p p i n g ---" << std::endl; std::cout << "start " << dashedLine << std::endl; From 64122db6952d3542886b28f2647eee55d7210c00 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Tue, 1 Sep 2020 09:28:19 -0500 Subject: [PATCH 06/10] Removed unused file testDTUnpackingModule.cc The file is never built, was directly using edm::EventProcessor and using the old no-longer-supported configuration language syntax. --- .../DTRawToDigi/test/testDTUnpackingModule.cc | 113 ------------------ 1 file changed, 113 deletions(-) delete mode 100644 EventFilter/DTRawToDigi/test/testDTUnpackingModule.cc diff --git a/EventFilter/DTRawToDigi/test/testDTUnpackingModule.cc b/EventFilter/DTRawToDigi/test/testDTUnpackingModule.cc deleted file mode 100644 index 4d7d0743aed8c..0000000000000 --- a/EventFilter/DTRawToDigi/test/testDTUnpackingModule.cc +++ /dev/null @@ -1,113 +0,0 @@ -/** - \file - Test suite for DTUnpackingModule - - \author Stefano ARGIRO - \date 29 Jun 2005 - - \note these tests are not testing anything but the thing not crashing - -*/ - -#include -#include -#include "FWCore/PluginManager/interface/ProblemTracker.h" -#include "FWCore/Utilities/interface/Exception.h" -#include -#include - -using namespace std; - -string releasetop(std::getenv("SCRAMRT_LOCALRT")); -string testfileLocation = releasetop + "/src/EventFilter/DTRawToDigi/test/"; - -class testDTUnpackingModule : public CppUnit::TestFixture { - CPPUNIT_TEST_SUITE(testDTUnpackingModule); - - CPPUNIT_TEST(testUnpacker); - CPPUNIT_TEST(testPoolIO); - - CPPUNIT_TEST_SUITE_END(); - -public: - void setUp() { - char* ret = std::getenv("SCRAMRT_LOCALRT"); - if (!ret) { - cerr << "env variable SCRAMRT_LOCALRT not set, try eval `scramv1 runt -csh`" << endl; - exit(1); - } - } - - void tearDown() {} - void testUnpacker(); - void writeOut(); - void testPoolIO(); - int runIt(const std::string& config); -}; - -///registration of the test so that the runner can find it -CPPUNIT_TEST_SUITE_REGISTRATION(testDTUnpackingModule); - -int testDTUnpackingModule::runIt(const std::string& config) { - edm::AssertHandler ah; - int rc = 0; - try { - edm::EventProcessor proc(config); - proc.run(); - } catch (seal::Error& e) { - std::cerr << "Exception caught: " << e.explainSelf() << std::endl; - rc = 1; - } - return rc; -} - -void testDTUnpackingModule::testUnpacker() { - const std::string config = - "process TEST = { \n" - "module dtunpacker = DTUnpackingModule{ }\n" - "module hit = DummyHitFinderModule{ }\n" - "path p = {dtunpacker, hit}\n" - "source = DAQFileInputService{ string fileName =\"" + - testfileLocation + "dtraw.raw" + - "\"\n" - " untracked int32 maxEvents = 1 }\n" - "}\n"; - - int rc = runIt(config); - CPPUNIT_ASSERT(rc == 0); -} - -void testDTUnpackingModule::writeOut() { - const std::string config = - "process TEST = { \n" - "module dtunpacker = DTUnpackingModule{ }\n" - "module out = PoolOutputModule {\n" - " untracked string fileName =\"" + - testfileLocation + "dtdigis.root" + - "\"} \n" - "path p = {dtunpacker, out}\n" - "source = DAQFileInputService{ string fileName =\"" + - testfileLocation + "dtraw.raw" + - "\"\n" - " untracked int32 maxEvents = 1 }\n" - "}\n"; - - int rc = runIt(config); - CPPUNIT_ASSERT(rc == 0); -} - -void testDTUnpackingModule::testPoolIO() { - writeOut(); - - const std::string config = - "process TEST = { \n" - " module hit = DummyHitFinderModule{ }\n" - " path p = {hit}\n" - " source = PoolSource{ string fileName =\"" + - testfileLocation + "dtdigis.root" + - "\"} \n" - "}\n"; - - int rc = runIt(config); - CPPUNIT_ASSERT(rc == 0); -} From 8ee085c108cc7537678edc01b3d7b6fd029d9a6e Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Tue, 1 Sep 2020 09:41:30 -0500 Subject: [PATCH 07/10] Use cmsRun directly for test Avoid direct use of EventProcessor and instead just have test use cmsRun. --- EventFilter/CSCTFRawToDigi/test/BuildFile.xml | 6 +- .../test/testCSCTFRawToDigi.cpp | 91 ------------------- .../test/testCSCTFRawToDigi_cfg.py | 22 +++++ 3 files changed, 23 insertions(+), 96 deletions(-) delete mode 100644 EventFilter/CSCTFRawToDigi/test/testCSCTFRawToDigi.cpp create mode 100644 EventFilter/CSCTFRawToDigi/test/testCSCTFRawToDigi_cfg.py diff --git a/EventFilter/CSCTFRawToDigi/test/BuildFile.xml b/EventFilter/CSCTFRawToDigi/test/BuildFile.xml index 5a0aa458c6e72..1fbc304e229ba 100644 --- a/EventFilter/CSCTFRawToDigi/test/BuildFile.xml +++ b/EventFilter/CSCTFRawToDigi/test/BuildFile.xml @@ -8,8 +8,4 @@ - - - - - + diff --git a/EventFilter/CSCTFRawToDigi/test/testCSCTFRawToDigi.cpp b/EventFilter/CSCTFRawToDigi/test/testCSCTFRawToDigi.cpp deleted file mode 100644 index b47dc4061f38e..0000000000000 --- a/EventFilter/CSCTFRawToDigi/test/testCSCTFRawToDigi.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* \file testCSCTFRawToDigi.cc - * - * \author L. Gray , ripped from testDaqSource - */ - -#include -#include -#include -#include -#include -#include "FWCore/ParameterSetReader/interface/ParameterSetReader.h" -#include -#include - -using namespace std; - -string releasetop(getenv("CMSSW_BASE")); -string testfileLocation = releasetop + "/src/EventFilter/CSCTFRawToDigi/test/"; - -class testCSCTFRawToDigi : public CppUnit::TestFixture { - CPPUNIT_TEST_SUITE(testCSCTFRawToDigi); - - // Test generating digis from raw data - CPPUNIT_TEST(testCreateDigis); - - CPPUNIT_TEST_SUITE_END(); - -public: - void setUp() { - char* ret = getenv("CMSSW_BASE"); - if (!ret) { - cerr << "env variable SCRAMRT_LOCALRT not set, try eval `scramv1 runt -csh`" << endl; - exit(1); - } - } - - void tearDown() {} - - void testCreateDigis(); - - int runIt(const std::string& config); -}; - -int testCSCTFRawToDigi::runIt(const std::string& config) { - edm::AssertHandler ah; - int rc = 0; - try { - edm::EventProcessor proc(edm::getPSetFromConfig(config)); - proc.run(); - } catch (cms::Exception& e) { - std::cerr << "Exception caught: " << e.explainSelf() << std::endl; - rc = 1; - } - return rc; -} - -// Read raw data from a file -void testCSCTFRawToDigi::testCreateDigis() { - cout << endl << endl << " ---- testCSCTFRawToDigi::testCreateDigis ---- " << endl << endl; - - const std::string config = - "import FWCore.ParameterSet.Config as cms \n" - "process = cms.Process(\"analyzer\") \n" - "process.load(\"EventFilter.CSCTFRawToDigi.csctfunpacker_cfi\") \n" - "process.load(\"EventFilter.CSCTFRawToDigi.csctfpacker_cfi\") \n" - "process.maxEvents = cms.untracked.PSet( input = cms.untracked.int32(128) )\n" - "process.load(\"FWCore.MessageLogger.MessageLogger_cfi\") \n" - "process.MessageLogger.cout.placeholder = cms.untracked.bool(False) \n" - "process.MessageLogger.cout.threshold = cms.untracked.string('INFO') \n" - "process.MessageLogger.debugModules = cms.untracked.vstring('*') \n" - "process.source = cms.Source(\"EmptySource\") \n" - "process.csctfsinglegen = cms.EDProducer(\"CSCTFSingleGen\") \n" - "process.csctfpacker.lctProducer = cms.InputTag(\"csctfsinglegen:\")\n" - "process.csctfpacker.mbProducer = cms.InputTag(\"null:\") \n" - "process.csctfpacker.trackProducer = cms.InputTag(\"null:\") \n" - "process.csctfunpacker.producer = cms.InputTag(\"csctfpacker\",\"CSCTFRawData\")\n" - "process.csctfanalyzer = cms.EDAnalyzer(\"CSCTFAnalyzer\", \n" - " mbProducer = cms.untracked.InputTag(\"csctfunpacker:DT\"), \n" - " lctProducer = cms.untracked.InputTag(\"csctfunpacker:\"), \n" - " trackProducer = cms.untracked.InputTag(\"csctfunpacker:\"), \n" - " statusProducer = cms.untracked.InputTag(\"csctfunpacker:\") \n" - ") \n" - "process.p = cms.Path(process.csctfsinglegen*process.csctfpacker*process.csctfunpacker*process.csctfanalyzer) \n"; - - int rc = runIt(config); - CPPUNIT_ASSERT(rc == 0); -} - -///registration of the test so that the runner can find it -CPPUNIT_TEST_SUITE_REGISTRATION(testCSCTFRawToDigi); -#include diff --git a/EventFilter/CSCTFRawToDigi/test/testCSCTFRawToDigi_cfg.py b/EventFilter/CSCTFRawToDigi/test/testCSCTFRawToDigi_cfg.py new file mode 100644 index 0000000000000..fd2db13aee393 --- /dev/null +++ b/EventFilter/CSCTFRawToDigi/test/testCSCTFRawToDigi_cfg.py @@ -0,0 +1,22 @@ +import FWCore.ParameterSet.Config as cms +process = cms.Process('analyzer') +process.load('EventFilter.CSCTFRawToDigi.csctfunpacker_cfi') +process.load('EventFilter.CSCTFRawToDigi.csctfpacker_cfi') +process.maxEvents = cms.untracked.PSet( input = cms.untracked.int32(128) ) +process.load('FWCore.MessageLogger.MessageLogger_cfi') +process.MessageLogger.cout.placeholder = cms.untracked.bool(False) +process.MessageLogger.cout.threshold = cms.untracked.string('INFO') +process.MessageLogger.debugModules = cms.untracked.vstring('*') +process.source = cms.Source('EmptySource') +process.csctfsinglegen = cms.EDProducer('CSCTFSingleGen') +process.csctfpacker.lctProducer = cms.InputTag('csctfsinglegen:') +process.csctfpacker.mbProducer = cms.InputTag('null:') +process.csctfpacker.trackProducer = cms.InputTag('null:') +process.csctfunpacker.producer = cms.InputTag('csctfpacker','CSCTFRawData') +process.csctfanalyzer = cms.EDAnalyzer('CSCTFAnalyzer', + mbProducer = cms.untracked.InputTag('csctfunpacker:DT'), + lctProducer = cms.untracked.InputTag('csctfunpacker:'), + trackProducer = cms.untracked.InputTag('csctfunpacker:'), + statusProducer = cms.untracked.InputTag('csctfunpacker:') + ) +process.p = cms.Path(process.csctfsinglegen*process.csctfpacker*process.csctfunpacker*process.csctfanalyzer) From ab1c866205d23e8c562fb9e4cba838496075524a Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Tue, 1 Sep 2020 14:22:08 -0500 Subject: [PATCH 08/10] Fix crash in TestProcessor when exception occurred Valgrind showed a double delete of share_ptr when exception happened in endRun(). Switching to using lambda capture instead of return value from functor avoids the problem. --- FWCore/TestProcessor/src/TestProcessor.cc | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/FWCore/TestProcessor/src/TestProcessor.cc b/FWCore/TestProcessor/src/TestProcessor.cc index d6139b065e2a4..fab9a970368b9 100644 --- a/FWCore/TestProcessor/src/TestProcessor.cc +++ b/FWCore/TestProcessor/src/TestProcessor.cc @@ -259,7 +259,10 @@ namespace edm { } edm::test::LuminosityBlock TestProcessor::testEndLuminosityBlockImpl() { - auto lumi = arena_.execute([this]() { + //using a return value from arena_.execute lead to double delete of shared_ptr + // based on valgrind output when exception occurred. Use lambda capture instead. + std::shared_ptr lumi; + arena_.execute([this, &lumi]() { if (not beginJobCalled_) { beginJob(); } @@ -272,7 +275,7 @@ namespace edm { if (not beginLumiCalled_) { beginLuminosityBlock(); } - return endLuminosityBlock(); + lumi = endLuminosityBlock(); }); if (esHelper_) { //We want each test to have its own ES data products @@ -306,7 +309,10 @@ namespace edm { principalCache_.runPrincipalPtr(), labelOfTestModule_, processConfiguration_->processName()); } edm::test::Run TestProcessor::testEndRunImpl() { - auto rp = arena_.execute([this]() { + //using a return value from arena_.execute lead to double delete of shared_ptr + // based on valgrind output when exception occurred. Use lambda capture instead. + std::shared_ptr rp; + arena_.execute([this, &rp]() { if (not beginJobCalled_) { beginJob(); } @@ -316,7 +322,7 @@ namespace edm { if (not beginRunCalled_) { beginRun(); } - return endRun(); + rp = endRun(); }); if (esHelper_) { //We want each test to have its own ES data products From cb40a2f421fdc342ae3b42bf6769fbc0e3cd6dca Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Thu, 3 Sep 2020 11:37:17 -0500 Subject: [PATCH 09/10] properly set number of threads for PythonEventProcessor --- FWCore/PythonFramework/src/PythonEventProcessor.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/FWCore/PythonFramework/src/PythonEventProcessor.cc b/FWCore/PythonFramework/src/PythonEventProcessor.cc index bf0448155a4ef..47d9c39bb046c 100644 --- a/FWCore/PythonFramework/src/PythonEventProcessor.cc +++ b/FWCore/PythonFramework/src/PythonEventProcessor.cc @@ -54,6 +54,7 @@ namespace { auto threadsInfo = threadOptions(*pset); threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr); + nThreads = threadsInfo.nThreads_; // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet setThreadOptions(threadsInfo, *pset); From c645faeb4072ada81c51bdb43e2b82df2015b932 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Thu, 3 Sep 2020 11:37:44 -0500 Subject: [PATCH 10/10] Added comment about ThreadsController lifetime requirements --- FWCore/Concurrency/interface/setNThreads.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/FWCore/Concurrency/interface/setNThreads.h b/FWCore/Concurrency/interface/setNThreads.h index dc6a616de1bc6..89677c90d3f04 100644 --- a/FWCore/Concurrency/interface/setNThreads.h +++ b/FWCore/Concurrency/interface/setNThreads.h @@ -10,6 +10,8 @@ #include "FWCore/Concurrency/interface/ThreadsController.h" namespace edm { + //This guarantees that the previous ThreadsController is destroyed before a new one starts + // At one time certain TBB control elements required such behavior. unsigned int setNThreads(unsigned int iNThreads, unsigned int iStackSize, std::unique_ptr& oPtr); -} +} // namespace edm #endif /* FWCore_Concurrency_setNThreads_h */