Skip to content

Commit

Permalink
Merge pull request #30908 from Dr15Jones/fixCmsRunThreads
Browse files Browse the repository at this point in the history
Properly set number of threads in CmsRun python module
  • Loading branch information
cmsbuild authored Jul 25, 2020
2 parents 387daac + a5c0738 commit e57feef
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 99 deletions.
17 changes: 17 additions & 0 deletions FWCore/Concurrency/interface/setNThreads.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef FWCore_Concurrency_setNThreads_h
#define FWCore_Concurrency_setNThreads_h
//
// setNThreads.h
// CMSSW
//
// Created by Chris Jones on 7/24/20.
//
#include <memory>
#include "tbb/task_scheduler_init.h"

namespace edm {
unsigned int setNThreads(unsigned int iNThreads,
unsigned int iStackSize,
std::unique_ptr<tbb::task_scheduler_init>& oPtr);
}
#endif /* FWCore_Concurrency_setNThreads_h */
32 changes: 32 additions & 0 deletions FWCore/Concurrency/src/setNThreads.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//
// setNThreads.cc
// CMSSW
//
// Created by Chris Jones on 7/24/20.
//

#include "FWCore/Concurrency/interface/setNThreads.h"

namespace edm {
unsigned int setNThreads(unsigned int iNThreads,
unsigned int iStackSize,
std::unique_ptr<tbb::task_scheduler_init>& oPtr) {
//The TBB documentation doesn't explicitly say this, but when the task_scheduler_init's
// destructor is run it does a 'wait all' for all tasks to finish and then shuts down all the threads.
// This provides a clean synchronization point.
//We have to destroy the old scheduler before starting a new one in order to
// get tbb to actually switch the number of threads. If we do not, tbb stays at 1 threads

//stack size is given in KB but passed in as bytes
iStackSize *= 1024;

oPtr.reset();
if (0 == iNThreads) {
//Allow TBB to decide how many threads. This is normally the number of CPUs in the machine.
iNThreads = tbb::task_scheduler_init::default_num_threads();
}
oPtr = std::make_unique<tbb::task_scheduler_init>(static_cast<int>(iNThreads), iStackSize);

return iNThreads;
}
} // namespace edm
58 changes: 10 additions & 48 deletions FWCore/Framework/bin/cmsRun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ PSet script. See notes in EventProcessor.cpp for details about it.
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ProcessDesc.h"
#include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
#include "FWCore/ParameterSet/interface/ThreadsInfo.h"
#include "FWCore/PluginManager/interface/PluginManager.h"
#include "FWCore/PluginManager/interface/PresenceFactory.h"
#include "FWCore/PluginManager/interface/standard.h"
#include "FWCore/ParameterSetReader/interface/ParameterSetReader.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/ServiceRegistry/interface/ServiceWrapper.h"
#include "FWCore/Concurrency/interface/setNThreads.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/EDMException.h"
#include "FWCore/Utilities/interface/ConvertException.h"
Expand Down Expand Up @@ -58,7 +59,6 @@ static char const* const kHelpOpt = "help";
static char const* const kHelpCommandOpt = "help,h";
static char const* const kStrictOpt = "strict";

constexpr unsigned int kDefaultSizeOfStackForThreadsInKB = 10 * 1024; //10MB
// -----------------------------------------------
namespace {
class EventProcessorWithSentry {
Expand Down Expand Up @@ -92,27 +92,6 @@ namespace {
bool callEndJob_;
};

unsigned int setNThreads(unsigned int iNThreads,
unsigned int iStackSize,
std::unique_ptr<tbb::task_scheduler_init>& oPtr) {
//The TBB documentation doesn't explicitly say this, but when the task_scheduler_init's
// destructor is run it does a 'wait all' for all tasks to finish and then shuts down all the threads.
// This provides a clean synchronization point.
//We have to destroy the old scheduler before starting a new one in order to
// get tbb to actually switch the number of threads. If we do not, tbb stays at 1 threads

//stack size is given in KB but passed in as bytes
iStackSize *= 1024;

oPtr.reset();
if (0 == iNThreads) {
//Allow TBB to decide how many threads. This is normally the number of CPUs in the machine.
iNThreads = tbb::task_scheduler_init::default_num_threads();
}
oPtr = std::make_unique<tbb::task_scheduler_init>(static_cast<int>(iNThreads), iStackSize);

return iNThreads;
}
} // namespace

int main(int argc, char* argv[]) {
Expand All @@ -126,7 +105,7 @@ int main(int argc, char* argv[]) {
//NOTE: with new version of TBB (44_20160316oss) we can only construct 1 tbb::task_scheduler_init per job
// else we get a crash. So for now we can't have any services use tasks in their constructors.
std::unique_ptr<tbb::task_scheduler_init> tsiPtr = std::make_unique<tbb::task_scheduler_init>(
edm::s_defaultNumberOfThreads, kDefaultSizeOfStackForThreadsInKB * 1024);
edm::s_defaultNumberOfThreads, edm::s_defaultSizeOfStackForThreadsInKB * 1024);
std::shared_ptr<edm::Presence> theMessageServicePresence;
std::unique_ptr<std::ofstream> jobReportStreamPtr;
std::shared_ptr<edm::serviceregistry::ServiceWrapper<edm::JobReport> > jobRep;
Expand Down Expand Up @@ -287,43 +266,26 @@ int main(int argc, char* argv[]) {
// Finally, reflect the values being used in the "options" top level ParameterSet.
context = "Setting up number of threads";
{
// default values
unsigned int nThreads = edm::s_defaultNumberOfThreads;
unsigned int stackSize = kDefaultSizeOfStackForThreadsInKB;

// check the "options" ParameterSet
std::shared_ptr<edm::ParameterSet> pset = processDesc->getProcessPSet();
if (pset->existsAs<edm::ParameterSet>("options", false)) {
auto const& ops = pset->getUntrackedParameterSet("options");
if (ops.existsAs<unsigned int>("numberOfThreads", false)) {
nThreads = ops.getUntrackedParameter<unsigned int>("numberOfThreads");
}
if (ops.existsAs<unsigned int>("sizeOfStackForThreadsInKB", false)) {
stackSize = ops.getUntrackedParameter<unsigned int>("sizeOfStackForThreadsInKB");
}
}
auto threadsInfo = threadOptions(*pset);

// check the command line options
if (vm.count(kNumberOfThreadsOpt)) {
nThreads = vm[kNumberOfThreadsOpt].as<unsigned int>();
threadsInfo.nThreads_ = vm[kNumberOfThreadsOpt].as<unsigned int>();
}
if (vm.count(kSizeOfStackForThreadOpt)) {
stackSize = vm[kSizeOfStackForThreadOpt].as<unsigned int>();
threadsInfo.stackSize_ = vm[kSizeOfStackForThreadOpt].as<unsigned int>();
}

// if needed, re-initialise TBB
if (nThreads != edm::s_defaultNumberOfThreads or stackSize != kDefaultSizeOfStackForThreadsInKB) {
nThreads = setNThreads(nThreads, stackSize, tsiPtr);
if (threadsInfo.nThreads_ != edm::s_defaultNumberOfThreads or
threadsInfo.stackSize_ != edm::s_defaultSizeOfStackForThreadsInKB) {
threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr);
}

// update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet
edm::ParameterSet newOp;
if (pset->existsAs<edm::ParameterSet>("options", false)) {
newOp = pset->getUntrackedParameterSet("options");
}
newOp.addUntrackedParameter<unsigned int>("numberOfThreads", nThreads);
newOp.addUntrackedParameter<unsigned int>("sizeOfStackForThreadsInKB", stackSize);
pset->insertParameterSet(true, "options", edm::ParameterSetEntry(newOp, false));
setThreadOptions(threadsInfo, *pset);
}

context = "Initializing default service configurations";
Expand Down
58 changes: 10 additions & 48 deletions FWCore/Framework/bin/cmsRunPython3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ PSet script. See notes in EventProcessor.cpp for details about it.
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ProcessDesc.h"
#include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h"
#include "FWCore/ParameterSet/interface/ThreadsInfo.h"
#include "FWCore/PluginManager/interface/PluginManager.h"
#include "FWCore/PluginManager/interface/PresenceFactory.h"
#include "FWCore/PluginManager/interface/standard.h"
//#include "FWCore/ParameterSetReader/interface/ParameterSetReader.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/ServiceRegistry/interface/ServiceWrapper.h"
#include "FWCore/Concurrency/interface/setNThreads.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/EDMException.h"
#include "FWCore/Utilities/interface/ConvertException.h"
Expand Down Expand Up @@ -58,7 +59,6 @@ static char const* const kHelpOpt = "help";
static char const* const kHelpCommandOpt = "help,h";
static char const* const kStrictOpt = "strict";

constexpr unsigned int kDefaultSizeOfStackForThreadsInKB = 10 * 1024; //10MB
// -----------------------------------------------
namespace {
class EventProcessorWithSentry {
Expand Down Expand Up @@ -92,27 +92,6 @@ namespace {
bool callEndJob_;
};

unsigned int setNThreads(unsigned int iNThreads,
unsigned int iStackSize,
std::unique_ptr<tbb::task_scheduler_init>& oPtr) {
//The TBB documentation doesn't explicitly say this, but when the task_scheduler_init's
// destructor is run it does a 'wait all' for all tasks to finish and then shuts down all the threads.
// This provides a clean synchronization point.
//We have to destroy the old scheduler before starting a new one in order to
// get tbb to actually switch the number of threads. If we do not, tbb stays at 1 threads

//stack size is given in KB but passed in as bytes
iStackSize *= 1024;

oPtr.reset();
if (0 == iNThreads) {
//Allow TBB to decide how many threads. This is normally the number of CPUs in the machine.
iNThreads = tbb::task_scheduler_init::default_num_threads();
}
oPtr = std::make_unique<tbb::task_scheduler_init>(static_cast<int>(iNThreads), iStackSize);

return iNThreads;
}
} // namespace

int main(int argc, char* argv[]) {
Expand All @@ -126,7 +105,7 @@ int main(int argc, char* argv[]) {
//NOTE: with new version of TBB (44_20160316oss) we can only construct 1 tbb::task_scheduler_init per job
// else we get a crash. So for now we can't have any services use tasks in their constructors.
std::unique_ptr<tbb::task_scheduler_init> tsiPtr = std::make_unique<tbb::task_scheduler_init>(
edm::s_defaultNumberOfThreads, kDefaultSizeOfStackForThreadsInKB * 1024);
edm::s_defaultNumberOfThreads, edm::s_defaultSizeOfStackForThreadsInKB * 1024);
std::shared_ptr<edm::Presence> theMessageServicePresence;
std::unique_ptr<std::ofstream> jobReportStreamPtr;
std::shared_ptr<edm::serviceregistry::ServiceWrapper<edm::JobReport> > jobRep;
Expand Down Expand Up @@ -287,43 +266,26 @@ int main(int argc, char* argv[]) {
// Finally, reflect the values being used in the "options" top level ParameterSet.
context = "Setting up number of threads";
{
// default values
unsigned int nThreads = edm::s_defaultNumberOfThreads;
unsigned int stackSize = kDefaultSizeOfStackForThreadsInKB;

// check the "options" ParameterSet
std::shared_ptr<edm::ParameterSet> pset = processDesc->getProcessPSet();
if (pset->existsAs<edm::ParameterSet>("options", false)) {
auto const& ops = pset->getUntrackedParameterSet("options");
if (ops.existsAs<unsigned int>("numberOfThreads", false)) {
nThreads = ops.getUntrackedParameter<unsigned int>("numberOfThreads");
}
if (ops.existsAs<unsigned int>("sizeOfStackForThreadsInKB", false)) {
stackSize = ops.getUntrackedParameter<unsigned int>("sizeOfStackForThreadsInKB");
}
}
auto threadsInfo = threadOptions(*pset);

// check the command line options
if (vm.count(kNumberOfThreadsOpt)) {
nThreads = vm[kNumberOfThreadsOpt].as<unsigned int>();
threadsInfo.nThreads_ = vm[kNumberOfThreadsOpt].as<unsigned int>();
}
if (vm.count(kSizeOfStackForThreadOpt)) {
stackSize = vm[kSizeOfStackForThreadOpt].as<unsigned int>();
threadsInfo.stackSize_ = vm[kSizeOfStackForThreadOpt].as<unsigned int>();
}

// if needed, re-initialise TBB
if (nThreads != edm::s_defaultNumberOfThreads or stackSize != kDefaultSizeOfStackForThreadsInKB) {
nThreads = setNThreads(nThreads, stackSize, tsiPtr);
if (threadsInfo.nThreads_ != edm::s_defaultNumberOfThreads or
threadsInfo.stackSize_ != edm::s_defaultSizeOfStackForThreadsInKB) {
threadsInfo.nThreads_ = edm::setNThreads(threadsInfo.nThreads_, threadsInfo.stackSize_, tsiPtr);
}

// update the numberOfThreads and sizeOfStackForThreadsInKB in the "options" ParameterSet
edm::ParameterSet newOp;
if (pset->existsAs<edm::ParameterSet>("options", false)) {
newOp = pset->getUntrackedParameterSet("options");
}
newOp.addUntrackedParameter<unsigned int>("numberOfThreads", nThreads);
newOp.addUntrackedParameter<unsigned int>("sizeOfStackForThreadsInKB", stackSize);
pset->insertParameterSet(true, "options", edm::ParameterSetEntry(newOp, false));
setThreadOptions(threadsInfo, *pset);
}

context = "Initializing default service configurations";
Expand Down
21 changes: 21 additions & 0 deletions FWCore/ParameterSet/interface/ThreadsInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#ifndef FWCore_ParameterSet_ThreadsInfo_h
#define FWCore_ParameterSet_ThreadsInfo_h

namespace edm {

class ParameterSet;
class ParameterSetDescription;

constexpr unsigned int s_defaultNumberOfThreads = 1;
constexpr unsigned int s_defaultSizeOfStackForThreadsInKB = 10 * 1024; //10MB

struct ThreadsInfo {
unsigned int nThreads_ = s_defaultNumberOfThreads;
unsigned int stackSize_ = s_defaultSizeOfStackForThreadsInKB;
};

ThreadsInfo threadOptions(edm::ParameterSet const& pset);
void setThreadOptions(ThreadsInfo const& threadsInfo, edm::ParameterSet& pset);

} // namespace edm
#endif
2 changes: 0 additions & 2 deletions FWCore/ParameterSet/interface/validateTopLevelParameterSets.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ namespace edm {
class ParameterSet;
class ParameterSetDescription;

constexpr unsigned int s_defaultNumberOfThreads = 1;

void validateTopLevelParameterSets(ParameterSet* processParameterSet);
void fillOptionsDescription(ParameterSetDescription& description);
void fillMaxEventsDescription(ParameterSetDescription& description);
Expand Down
36 changes: 36 additions & 0 deletions FWCore/ParameterSet/src/ThreadsInfo.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// ThreadsInfo.cc
// CMSSW
//
// Created by Chris Jones on 7/24/20.
//
#include "FWCore/ParameterSet/interface/ThreadsInfo.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"

namespace edm {
ThreadsInfo threadOptions(edm::ParameterSet const& pset) {
// default values
ThreadsInfo threadsInfo;

if (pset.existsAs<edm::ParameterSet>("options", false)) {
auto const& ops = pset.getUntrackedParameterSet("options");
if (ops.existsAs<unsigned int>("numberOfThreads", false)) {
threadsInfo.nThreads_ = ops.getUntrackedParameter<unsigned int>("numberOfThreads");
}
if (ops.existsAs<unsigned int>("sizeOfStackForThreadsInKB", false)) {
threadsInfo.stackSize_ = ops.getUntrackedParameter<unsigned int>("sizeOfStackForThreadsInKB");
}
}
return threadsInfo;
}

void setThreadOptions(ThreadsInfo const& threadsInfo, edm::ParameterSet& pset) {
edm::ParameterSet newOp;
if (pset.existsAs<edm::ParameterSet>("options", false)) {
newOp = pset.getUntrackedParameterSet("options");
}
newOp.addUntrackedParameter<unsigned int>("numberOfThreads", threadsInfo.nThreads_);
newOp.addUntrackedParameter<unsigned int>("sizeOfStackForThreadsInKB", threadsInfo.stackSize_);
pset.insertParameterSet(true, "options", edm::ParameterSetEntry(newOp, false));
}
} // namespace edm
1 change: 1 addition & 0 deletions FWCore/ParameterSet/src/validateTopLevelParameterSets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/ParameterSet/interface/ThreadsInfo.h"

#include "FWCore/Utilities/interface/EDMException.h"

Expand Down
Loading

0 comments on commit e57feef

Please sign in to comment.