diff --git a/FWCore/Concurrency/interface/setNThreads.h b/FWCore/Concurrency/interface/setNThreads.h new file mode 100644 index 0000000000000..7ac0acfd2ecfd --- /dev/null +++ b/FWCore/Concurrency/interface/setNThreads.h @@ -0,0 +1,17 @@ +#ifndef FWCore_Concurrency_setNThreads_h +#define FWCore_Concurrency_setNThreads_h +// +// setNThreads.h +// CMSSW +// +// Created by Chris Jones on 7/24/20. +// +#include +#include "tbb/task_scheduler_init.h" + +namespace edm { + unsigned int setNThreads(unsigned int iNThreads, + unsigned int iStackSize, + std::unique_ptr& oPtr); +} +#endif /* FWCore_Concurrency_setNThreads_h */ diff --git a/FWCore/Concurrency/src/setNThreads.cc b/FWCore/Concurrency/src/setNThreads.cc new file mode 100644 index 0000000000000..cfa96ab5823c6 --- /dev/null +++ b/FWCore/Concurrency/src/setNThreads.cc @@ -0,0 +1,32 @@ +// +// setNThreads.cc +// CMSSW +// +// Created by Chris Jones on 7/24/20. +// + +#include "FWCore/Concurrency/interface/setNThreads.h" + +namespace edm { + unsigned int setNThreads(unsigned int iNThreads, + unsigned int iStackSize, + std::unique_ptr& oPtr) { + //The TBB documentation doesn't explicitly say this, but when the task_scheduler_init's + // destructor is run it does a 'wait all' for all tasks to finish and then shuts down all the threads. + // This provides a clean synchronization point. + //We have to destroy the old scheduler before starting a new one in order to + // get tbb to actually switch the number of threads. If we do not, tbb stays at 1 threads + + //stack size is given in KB but passed in as bytes + iStackSize *= 1024; + + oPtr.reset(); + if (0 == iNThreads) { + //Allow TBB to decide how many threads. This is normally the number of CPUs in the machine. + iNThreads = tbb::task_scheduler_init::default_num_threads(); + } + oPtr = std::make_unique(static_cast(iNThreads), iStackSize); + + return iNThreads; + } +} // namespace edm diff --git a/FWCore/Framework/bin/cmsRun.cpp b/FWCore/Framework/bin/cmsRun.cpp index 1c65206f27a22..0fb5876fd4132 100644 --- a/FWCore/Framework/bin/cmsRun.cpp +++ b/FWCore/Framework/bin/cmsRun.cpp @@ -11,7 +11,7 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ProcessDesc.h" -#include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h" +#include "FWCore/ParameterSet/interface/ThreadsInfo.h" #include "FWCore/PluginManager/interface/PluginManager.h" #include "FWCore/PluginManager/interface/PresenceFactory.h" #include "FWCore/PluginManager/interface/standard.h" @@ -19,6 +19,7 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" #include "FWCore/ServiceRegistry/interface/ServiceToken.h" #include "FWCore/ServiceRegistry/interface/ServiceWrapper.h" +#include "FWCore/Concurrency/interface/setNThreads.h" #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/EDMException.h" #include "FWCore/Utilities/interface/ConvertException.h" @@ -58,7 +59,6 @@ static char const* const kHelpOpt = "help"; static char const* const kHelpCommandOpt = "help,h"; static char const* const kStrictOpt = "strict"; -constexpr unsigned int kDefaultSizeOfStackForThreadsInKB = 10 * 1024; //10MB // ----------------------------------------------- namespace { class EventProcessorWithSentry { @@ -92,27 +92,6 @@ namespace { bool callEndJob_; }; - unsigned int setNThreads(unsigned int iNThreads, - unsigned int iStackSize, - std::unique_ptr& oPtr) { - //The TBB documentation doesn't explicitly say this, but when the task_scheduler_init's - // destructor is run it does a 'wait all' for all tasks to finish and then shuts down all the threads. - // This provides a clean synchronization point. - //We have to destroy the old scheduler before starting a new one in order to - // get tbb to actually switch the number of threads. If we do not, tbb stays at 1 threads - - //stack size is given in KB but passed in as bytes - iStackSize *= 1024; - - oPtr.reset(); - if (0 == iNThreads) { - //Allow TBB to decide how many threads. This is normally the number of CPUs in the machine. - iNThreads = tbb::task_scheduler_init::default_num_threads(); - } - oPtr = std::make_unique(static_cast(iNThreads), iStackSize); - - return iNThreads; - } } // namespace int main(int argc, char* argv[]) { @@ -126,7 +105,7 @@ int main(int argc, char* argv[]) { //NOTE: with new version of TBB (44_20160316oss) we can only construct 1 tbb::task_scheduler_init per job // else we get a crash. So for now we can't have any services use tasks in their constructors. std::unique_ptr tsiPtr = std::make_unique( - edm::s_defaultNumberOfThreads, kDefaultSizeOfStackForThreadsInKB * 1024); + edm::s_defaultNumberOfThreads, edm::s_defaultSizeOfStackForThreadsInKB * 1024); std::shared_ptr theMessageServicePresence; std::unique_ptr jobReportStreamPtr; std::shared_ptr > jobRep; @@ -287,43 +266,26 @@ int main(int argc, char* argv[]) { // Finally, reflect the values being used in the "options" top level ParameterSet. context = "Setting up number of threads"; { - // default values - unsigned int nThreads = edm::s_defaultNumberOfThreads; - unsigned int stackSize = kDefaultSizeOfStackForThreadsInKB; - // check the "options" ParameterSet std::shared_ptr pset = processDesc->getProcessPSet(); - if (pset->existsAs("options", false)) { - auto const& ops = pset->getUntrackedParameterSet("options"); - if (ops.existsAs("numberOfThreads", false)) { - nThreads = ops.getUntrackedParameter("numberOfThreads"); - } - if (ops.existsAs("sizeOfStackForThreadsInKB", false)) { - stackSize = ops.getUntrackedParameter("sizeOfStackForThreadsInKB"); - } - } + auto threadsInfo = threadOptions(*pset); // check the command line options if (vm.count(kNumberOfThreadsOpt)) { - nThreads = vm[kNumberOfThreadsOpt].as(); + threadsInfo.nThreads_ = vm[kNumberOfThreadsOpt].as(); } if (vm.count(kSizeOfStackForThreadOpt)) { - stackSize = vm[kSizeOfStackForThreadOpt].as(); + threadsInfo.stackSize_ = vm[kSizeOfStackForThreadOpt].as(); } // if needed, re-initialise TBB - if (nThreads != edm::s_defaultNumberOfThreads or stackSize != kDefaultSizeOfStackForThreadsInKB) { - nThreads = setNThreads(nThreads, stackSize, tsiPtr); + if (threadsInfo.nThreads_ != edm::s_defaultNumberOfThreads or + threadsInfo.stackSize_ != edm::s_defaultSizeOfStackForThreadsInKB) { + threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr); } // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet - edm::ParameterSet newOp; - if (pset->existsAs("options", false)) { - newOp = pset->getUntrackedParameterSet("options"); - } - newOp.addUntrackedParameter("numberOfThreads", nThreads); - newOp.addUntrackedParameter("sizeOfStackForThreadsInKB", stackSize); - pset->insertParameterSet(true, "options", edm::ParameterSetEntry(newOp, false)); + setThreadOptions(threadsInfo, *pset); } context = "Initializing default service configurations"; diff --git a/FWCore/Framework/bin/cmsRunPython3.cpp b/FWCore/Framework/bin/cmsRunPython3.cpp index cc372c6db5d3b..7bc4ed6b25e1d 100644 --- a/FWCore/Framework/bin/cmsRunPython3.cpp +++ b/FWCore/Framework/bin/cmsRunPython3.cpp @@ -11,7 +11,7 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ProcessDesc.h" -#include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h" +#include "FWCore/ParameterSet/interface/ThreadsInfo.h" #include "FWCore/PluginManager/interface/PluginManager.h" #include "FWCore/PluginManager/interface/PresenceFactory.h" #include "FWCore/PluginManager/interface/standard.h" @@ -19,6 +19,7 @@ PSet script. See notes in EventProcessor.cpp for details about it. #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" #include "FWCore/ServiceRegistry/interface/ServiceToken.h" #include "FWCore/ServiceRegistry/interface/ServiceWrapper.h" +#include "FWCore/Concurrency/interface/setNThreads.h" #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/EDMException.h" #include "FWCore/Utilities/interface/ConvertException.h" @@ -58,7 +59,6 @@ static char const* const kHelpOpt = "help"; static char const* const kHelpCommandOpt = "help,h"; static char const* const kStrictOpt = "strict"; -constexpr unsigned int kDefaultSizeOfStackForThreadsInKB = 10 * 1024; //10MB // ----------------------------------------------- namespace { class EventProcessorWithSentry { @@ -92,27 +92,6 @@ namespace { bool callEndJob_; }; - unsigned int setNThreads(unsigned int iNThreads, - unsigned int iStackSize, - std::unique_ptr& oPtr) { - //The TBB documentation doesn't explicitly say this, but when the task_scheduler_init's - // destructor is run it does a 'wait all' for all tasks to finish and then shuts down all the threads. - // This provides a clean synchronization point. - //We have to destroy the old scheduler before starting a new one in order to - // get tbb to actually switch the number of threads. If we do not, tbb stays at 1 threads - - //stack size is given in KB but passed in as bytes - iStackSize *= 1024; - - oPtr.reset(); - if (0 == iNThreads) { - //Allow TBB to decide how many threads. This is normally the number of CPUs in the machine. - iNThreads = tbb::task_scheduler_init::default_num_threads(); - } - oPtr = std::make_unique(static_cast(iNThreads), iStackSize); - - return iNThreads; - } } // namespace int main(int argc, char* argv[]) { @@ -126,7 +105,7 @@ int main(int argc, char* argv[]) { //NOTE: with new version of TBB (44_20160316oss) we can only construct 1 tbb::task_scheduler_init per job // else we get a crash. So for now we can't have any services use tasks in their constructors. std::unique_ptr tsiPtr = std::make_unique( - edm::s_defaultNumberOfThreads, kDefaultSizeOfStackForThreadsInKB * 1024); + edm::s_defaultNumberOfThreads, edm::s_defaultSizeOfStackForThreadsInKB * 1024); std::shared_ptr theMessageServicePresence; std::unique_ptr jobReportStreamPtr; std::shared_ptr > jobRep; @@ -287,43 +266,26 @@ int main(int argc, char* argv[]) { // Finally, reflect the values being used in the "options" top level ParameterSet. context = "Setting up number of threads"; { - // default values - unsigned int nThreads = edm::s_defaultNumberOfThreads; - unsigned int stackSize = kDefaultSizeOfStackForThreadsInKB; - // check the "options" ParameterSet std::shared_ptr pset = processDesc->getProcessPSet(); - if (pset->existsAs("options", false)) { - auto const& ops = pset->getUntrackedParameterSet("options"); - if (ops.existsAs("numberOfThreads", false)) { - nThreads = ops.getUntrackedParameter("numberOfThreads"); - } - if (ops.existsAs("sizeOfStackForThreadsInKB", false)) { - stackSize = ops.getUntrackedParameter("sizeOfStackForThreadsInKB"); - } - } + auto threadsInfo = threadOptions(*pset); // check the command line options if (vm.count(kNumberOfThreadsOpt)) { - nThreads = vm[kNumberOfThreadsOpt].as(); + threadsInfo.nThreads_ = vm[kNumberOfThreadsOpt].as(); } if (vm.count(kSizeOfStackForThreadOpt)) { - stackSize = vm[kSizeOfStackForThreadOpt].as(); + threadsInfo.stackSize_ = vm[kSizeOfStackForThreadOpt].as(); } // if needed, re-initialise TBB - if (nThreads != edm::s_defaultNumberOfThreads or stackSize != kDefaultSizeOfStackForThreadsInKB) { - nThreads = setNThreads(nThreads, stackSize, tsiPtr); + if (threadsInfo.nThreads_ != edm::s_defaultNumberOfThreads or + threadsInfo.stackSize_ != edm::s_defaultSizeOfStackForThreadsInKB) { + threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr); } // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet - edm::ParameterSet newOp; - if (pset->existsAs("options", false)) { - newOp = pset->getUntrackedParameterSet("options"); - } - newOp.addUntrackedParameter("numberOfThreads", nThreads); - newOp.addUntrackedParameter("sizeOfStackForThreadsInKB", stackSize); - pset->insertParameterSet(true, "options", edm::ParameterSetEntry(newOp, false)); + setThreadOptions(threadsInfo, *pset); } context = "Initializing default service configurations"; diff --git a/FWCore/ParameterSet/interface/ThreadsInfo.h b/FWCore/ParameterSet/interface/ThreadsInfo.h new file mode 100644 index 0000000000000..7970487bee331 --- /dev/null +++ b/FWCore/ParameterSet/interface/ThreadsInfo.h @@ -0,0 +1,21 @@ +#ifndef FWCore_ParameterSet_ThreadsInfo_h +#define FWCore_ParameterSet_ThreadsInfo_h + +namespace edm { + + class ParameterSet; + class ParameterSetDescription; + + constexpr unsigned int s_defaultNumberOfThreads = 1; + constexpr unsigned int s_defaultSizeOfStackForThreadsInKB = 10 * 1024; //10MB + + struct ThreadsInfo { + unsigned int nThreads_ = s_defaultNumberOfThreads; + unsigned int stackSize_ = s_defaultSizeOfStackForThreadsInKB; + }; + + ThreadsInfo threadOptions(edm::ParameterSet const& pset); + void setThreadOptions(ThreadsInfo const& threadsInfo, edm::ParameterSet& pset); + +} // namespace edm +#endif diff --git a/FWCore/ParameterSet/interface/validateTopLevelParameterSets.h b/FWCore/ParameterSet/interface/validateTopLevelParameterSets.h index bf867fa4c1418..25b1b4c2621ae 100644 --- a/FWCore/ParameterSet/interface/validateTopLevelParameterSets.h +++ b/FWCore/ParameterSet/interface/validateTopLevelParameterSets.h @@ -6,8 +6,6 @@ namespace edm { class ParameterSet; class ParameterSetDescription; - constexpr unsigned int s_defaultNumberOfThreads = 1; - void validateTopLevelParameterSets(ParameterSet* processParameterSet); void fillOptionsDescription(ParameterSetDescription& description); void fillMaxEventsDescription(ParameterSetDescription& description); diff --git a/FWCore/ParameterSet/src/ThreadsInfo.cc b/FWCore/ParameterSet/src/ThreadsInfo.cc new file mode 100644 index 0000000000000..a8a44adc55689 --- /dev/null +++ b/FWCore/ParameterSet/src/ThreadsInfo.cc @@ -0,0 +1,36 @@ +// +// ThreadsInfo.cc +// CMSSW +// +// Created by Chris Jones on 7/24/20. +// +#include "FWCore/ParameterSet/interface/ThreadsInfo.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" + +namespace edm { + ThreadsInfo threadOptions(edm::ParameterSet const& pset) { + // default values + ThreadsInfo threadsInfo; + + if (pset.existsAs("options", false)) { + auto const& ops = pset.getUntrackedParameterSet("options"); + if (ops.existsAs("numberOfThreads", false)) { + threadsInfo.nThreads_ = ops.getUntrackedParameter("numberOfThreads"); + } + if (ops.existsAs("sizeOfStackForThreadsInKB", false)) { + threadsInfo.stackSize_ = ops.getUntrackedParameter("sizeOfStackForThreadsInKB"); + } + } + return threadsInfo; + } + + void setThreadOptions(ThreadsInfo const& threadsInfo, edm::ParameterSet& pset) { + edm::ParameterSet newOp; + if (pset.existsAs("options", false)) { + newOp = pset.getUntrackedParameterSet("options"); + } + newOp.addUntrackedParameter("numberOfThreads", threadsInfo.nThreads_); + newOp.addUntrackedParameter("sizeOfStackForThreadsInKB", threadsInfo.stackSize_); + pset.insertParameterSet(true, "options", edm::ParameterSetEntry(newOp, false)); + } +} // namespace edm diff --git a/FWCore/ParameterSet/src/validateTopLevelParameterSets.cc b/FWCore/ParameterSet/src/validateTopLevelParameterSets.cc index 2d7ab22a52e9b..e512232fbb042 100644 --- a/FWCore/ParameterSet/src/validateTopLevelParameterSets.cc +++ b/FWCore/ParameterSet/src/validateTopLevelParameterSets.cc @@ -2,6 +2,7 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ParameterSet/interface/ThreadsInfo.h" #include "FWCore/Utilities/interface/EDMException.h" diff --git a/FWCore/PythonFramework/src/PythonEventProcessor.cc b/FWCore/PythonFramework/src/PythonEventProcessor.cc index 95d8e4369fecf..016ffdea19103 100644 --- a/FWCore/PythonFramework/src/PythonEventProcessor.cc +++ b/FWCore/PythonFramework/src/PythonEventProcessor.cc @@ -19,6 +19,7 @@ #include "FWCore/Framework/interface/defaultCmsRunServices.h" #include "FWCore/ParameterSet/interface/ProcessDesc.h" +#include "FWCore/ParameterSet/interface/ThreadsInfo.h" #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" #include "FWCore/ServiceRegistry/interface/ServiceToken.h" @@ -27,6 +28,7 @@ #include "FWCore/PluginManager/interface/PluginManager.h" #include "FWCore/PluginManager/interface/standard.h" +#include "FWCore/Concurrency/interface/setNThreads.h" #include "FWCore/Utilities/interface/thread_safety_macros.h" namespace { @@ -41,6 +43,22 @@ namespace { return iDesc; } + //TBB only allows 1 task_scheduler_init active on a thread. + CMS_THREAD_SAFE std::unique_ptr tsiPtr; + + std::shared_ptr setupThreading(std::shared_ptr iDesc) { + // check the "options" ParameterSet + std::shared_ptr pset = iDesc->getProcessPSet(); + auto threadsInfo = threadOptions(*pset); + + threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr); + + // update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet + setThreadOptions(threadsInfo, *pset); + + return iDesc; + } + edm::ServiceToken createJobReport() { return edm::ServiceRegistry::createContaining( std::make_shared>( @@ -61,7 +79,7 @@ namespace { // PythonEventProcessor::PythonEventProcessor(PyBind11ProcessDesc const& iDesc) : forcePluginSetupFirst_(setupPluginSystem()), - processor_(addDefaultServicesToProcessDesc(iDesc.processDesc()), + processor_(addDefaultServicesToProcessDesc(setupThreading(iDesc.processDesc())), createJobReport(), edm::serviceregistry::kOverlapIsError) {}