Skip to content

Commit

Permalink
Merge pull request #34084 from Dr15Jones/testModuleLevelThread
Browse files Browse the repository at this point in the history
Test how to do blocking hand-off between TBB and module thread
  • Loading branch information
cmsbuild authored Jun 15, 2021
2 parents 1901690 + 54aa65d commit b9bc3c1
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 30 deletions.
10 changes: 9 additions & 1 deletion FWCore/Integration/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,15 @@
<use name="boost"/>
<use name="clhep"/>
</library>

<library file="TestServicesOnNonFrameworkThreadsAnalyzer.cc" name = "TestServicesOnNonFrameworkThreadsAnalyzer">
<flags EDM_PLUGIN="1"/>
<use name="FWCore/Framework"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/MessageLogger"/>
<use name="clhep"/>
</library>
<test name="TestFWCoreIntegrationModuleThread" command="cmsRun ${LOCALTOP}/src/FWCore/Integration/test/moduleThread_test_cfg.py"/>

<bin file="RandomIntProducer_t.cpp">
<use name="FWCore/Framework"/>
<use name="FWCore/ParameterSet"/>
Expand Down
119 changes: 119 additions & 0 deletions FWCore/Integration/test/TestServicesOnNonFrameworkThreadsAnalyzer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#include "FWCore/Framework/interface/stream/EDAnalyzer.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/ModuleContextSentry.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/Service.h"

#include "FWCore/Utilities/interface/RandomNumberGenerator.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/MessageLogger/interface/edm_MessageLogger.h"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <iostream>
#include <exception>

#include "CLHEP/Random/RandFlat.h"

namespace edmtest {
class TestServicesOnNonFrameworkThreadsAnalyzer : public edm::stream::EDAnalyzer<> {
public:
TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&);
~TestServicesOnNonFrameworkThreadsAnalyzer() override;

void analyze(edm::Event const&, edm::EventSetup const&) final;

private:
void runOnOtherThread();
void shutdownThread();
std::unique_ptr<std::thread> m_thread;
std::mutex m_mutex;
std::condition_variable m_condVar;

bool m_managerThreadReady = false;
bool m_continueProcessing = false;
bool m_eventWorkDone = false;

//context info
edm::ModuleCallingContext const* m_moduleCallingContext = nullptr;
edm::ServiceToken* m_serviceToken = nullptr;
edm::StreamID m_streamID;
std::exception_ptr m_except;
};

TestServicesOnNonFrameworkThreadsAnalyzer::TestServicesOnNonFrameworkThreadsAnalyzer(edm::ParameterSet const&)
: m_streamID(edm::StreamID::invalidStreamID()) {
m_thread = std::make_unique<std::thread>([this]() { this->runOnOtherThread(); });

m_mutex.lock();
m_managerThreadReady = true;
m_continueProcessing = true;
}

TestServicesOnNonFrameworkThreadsAnalyzer::~TestServicesOnNonFrameworkThreadsAnalyzer() {
if (m_thread) {
shutdownThread();
}
}

void TestServicesOnNonFrameworkThreadsAnalyzer::analyze(edm::Event const& iEvent, edm::EventSetup const&) {
m_eventWorkDone = false;
m_moduleCallingContext = iEvent.moduleCallingContext();
edm::ServiceToken token = edm::ServiceRegistry::instance().presentToken();
m_serviceToken = &token;
m_streamID = iEvent.streamID();
{ edm::LogSystem("FrameworkThread") << "new Event"; }
m_mutex.unlock();
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condVar.notify_one();
m_condVar.wait(lk, [this] { return this->m_eventWorkDone; });
lk.release();
}
edm::LogSystem("FrameworkThread") << " done";
m_managerThreadReady = true;
if (m_except) {
std::rethrow_exception(m_except);
}
}

void TestServicesOnNonFrameworkThreadsAnalyzer::runOnOtherThread() {
std::unique_lock<std::mutex> lk(m_mutex);

do {
m_condVar.wait(lk, [this] { return m_managerThreadReady; });
if (m_continueProcessing) {
edm::ModuleCallingContext newContext(*m_moduleCallingContext);
edm::ModuleContextSentry sentry(&newContext, m_moduleCallingContext->parent());

edm::ServiceRegistry::Operate srSentry(*m_serviceToken);
try {
edm::Service<edm::RandomNumberGenerator> rng;
edm::Service<edm::MessageLogger> ml;
ml->setThreadContext(*m_moduleCallingContext);
edm::LogSystem("ModuleThread") << " ++running with rng "
<< CLHEP::RandFlat::shootInt(&rng->getEngine(m_streamID), 10);
} catch (...) {
m_except = std::current_exception();
}
}
m_eventWorkDone = true;
m_managerThreadReady = false;
lk.unlock();
m_condVar.notify_one();
lk.lock();
} while (m_continueProcessing);
}

void TestServicesOnNonFrameworkThreadsAnalyzer::shutdownThread() {
m_continueProcessing = false;
m_mutex.unlock();
m_condVar.notify_one();
m_thread->join();
}

} // namespace edmtest

DEFINE_FWK_MODULE(edmtest::TestServicesOnNonFrameworkThreadsAnalyzer);
15 changes: 15 additions & 0 deletions FWCore/Integration/test/moduleThread_test_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import FWCore.ParameterSet.Config as cms

process = cms.Process("Test")

process.source = cms.Source("EmptySource")

process.maxEvents.input = 10

process.test = cms.EDAnalyzer("edmtest::TestServicesOnNonFrameworkThreadsAnalyzer")

process.p = cms.EndPath(process.test)

process.add_(cms.Service("RandomNumberGeneratorService",
test = cms.PSet(initialSeed = cms.untracked.uint32(12345))
))
41 changes: 41 additions & 0 deletions FWCore/MessageLogger/interface/edm_MessageLogger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#ifndef FWCore_MessageService_MessageLogger_h
#define FWCore_MessageService_MessageLogger_h

// -*- C++ -*-
//
// Package: MessageService
// Class : MessageLogger
//
/**\class edm::MessageLogger MessageLogger.h FWCore/MessageService/plugins/MessageLogger.h
Description: Abstract interface for MessageLogger Service
Usage:
<usage>
*/
//

// system include files

// user include files

// forward declarations

namespace edm {
class ModuleCallingContext;

class MessageLogger {
public:
virtual ~MessageLogger();

virtual void setThreadContext(ModuleCallingContext const&) = 0;

protected:
MessageLogger() = default;

}; // MessageLogger

} // namespace edm

#endif // FWCore_MessageService_MessageLogger_h
10 changes: 10 additions & 0 deletions FWCore/MessageLogger/src/edm_MessageLogger.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//
// MessageLogger.cc
// CMSSW
//
// Created by Chris Jones on 6/10/21.
//

#include "FWCore/MessageLogger/interface/edm_MessageLogger.h"

edm::MessageLogger::~MessageLogger() = default;
2 changes: 0 additions & 2 deletions FWCore/MessageService/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
<use name="DataFormats/Provenance"/>
<use name="FWCore/MessageLogger"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/ServiceRegistry"/>
<use name="FWCore/Utilities"/>
<use name="tbb"/>
<export>
Expand Down
2 changes: 2 additions & 0 deletions FWCore/MessageService/plugins/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<library file="*.cc" name="FWCoreMessageServicePlugins">
<flags EDM_PLUGIN="1"/>
<use name="FWCore/MessageService"/>
<use name="FWCore/ServiceRegistry"/>
<use name="DataFormats/Provenance"/>
</library>
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

#include "FWCore/ParameterSet/interface/ParameterSet.h"

#include "FWCore/MessageService/interface/MessageLogger.h"
#include "FWCore/MessageService/src/MessageServicePSetValidation.h"
#include "MessageLogger.h"
#include "MessageServicePSetValidation.h"

#include "FWCore/MessageLogger/interface/MessageLoggerQ.h"
#include "FWCore/MessageLogger/interface/MessageDrop.h"
Expand Down Expand Up @@ -325,6 +325,29 @@ namespace edm {
nonModule_errorEnabled = messageDrop->errorEnabled;
} // ctor

void MessageLogger::setThreadContext(ModuleCallingContext const& iModContext) {
//need to know if we are in a global or stream context
auto top = iModContext.getTopModuleCallingContext();
assert(nullptr != top);
if (ParentContext::Type::kGlobal == top->type()) {
auto globalContext = iModContext.getGlobalContext();
auto tran = globalContext->transition();
if (tran == GlobalContext::Transition::kBeginLuminosityBlock or
tran == GlobalContext::Transition::kEndLuminosityBlock) {
establishModule(lumiInfoBegin_ + globalContext->luminosityBlockIndex(),
iModContext,
s_globalTransitionNames[static_cast<int>(tran)]);
} else {
establishModule(
runInfoBegin_ + globalContext->runIndex(), iModContext, s_globalTransitionNames[static_cast<int>(tran)]);
}
} else {
auto stream = iModContext.getStreamContext();
establishModule(
stream->streamID().value(), iModContext, s_streamTransitionNames[static_cast<int>(stream->transition())]);
}
}

//
// Shared helper routines for establishing module name and enabling behavior
//
Expand Down Expand Up @@ -867,22 +890,22 @@ namespace edm {
}

void MessageLogger::postEndJob() {
SummarizeInJobReport(); // Put summary info into Job Rep // change log 10
summarizeInJobReport(); // Put summary info into Job Rep // change log 10
MessageLoggerQ::MLqSUM(); // trigger summary info. // change log 9
}

void MessageLogger::jobFailure() {
MessageDrop* messageDrop = MessageDrop::instance();
messageDrop->setSinglet("jobFailure");
SummarizeInJobReport(); // Put summary info into Job Rep // change log 10
summarizeInJobReport(); // Put summary info into Job Rep // change log 10
MessageLoggerQ::MLqSUM(); // trigger summary info. // change log 9
}

//
// Other methods
//

void MessageLogger::SummarizeInJobReport() {
void MessageLogger::summarizeInJobReport() {
if (fjrSummaryRequested_) {
std::map<std::string, double>* smp = new std::map<std::string, double>();
MessageLoggerQ::MLqJRS(smp);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#ifndef FWCore_MessageService_MessageLogger_h
#define FWCore_MessageService_MessageLogger_h
#ifndef FWCore_MessageService_plugins_MessageLogger_h
#define FWCore_MessageService_plugins_MessageLogger_h

// -*- C++ -*-
//
// Package: Services
// Package: MessageService
// Class : MessageLogger
//
/**\class MessageLogger MessageLogger.h FWCore/MessageService/interface/MessageLogger.h
/**\class edm::service::MessageLogger MessageLogger.h FWCore/MessageService/plugins/MessageLogger.h
Description: <one line class summary>
Expand All @@ -33,7 +33,7 @@

#include "DataFormats/Provenance/interface/EventID.h"
#include "FWCore/MessageLogger/interface/ELseverityLevel.h"
#include "FWCore/MessageLogger/interface/ErrorObj.h"
#include "FWCore/MessageLogger/interface/edm_MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

Expand All @@ -44,18 +44,15 @@ namespace edm {
class ParameterSet;
namespace service {

class MessageLogger {
class MessageLogger : public edm::MessageLogger {
public:
MessageLogger(ParameterSet const&, ActivityRegistry&);

void fillErrorObj(edm::ErrorObj& obj) const;
bool debugEnabled() const { return debugEnabled_; }

static bool anyDebugEnabled() { return anyDebugEnabled_; }

static void SummarizeInJobReport();
void setThreadContext(ModuleCallingContext const&) final;

private:
static void summarizeInJobReport();

void postBeginJob();
void preEndJob();
void postEndJob();
Expand Down Expand Up @@ -193,4 +190,4 @@ namespace edm {

} // namespace edm

#endif // FWCore_MessageService_MessageLogger_h
#endif // FWCore_MessageService_plugins_MessageLogger_h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// user include files

#include "FWCore/MessageService/src/MessageServicePSetValidation.h"
#include "MessageServicePSetValidation.h"

using namespace edm;
using namespace edm::service;
Expand Down
7 changes: 5 additions & 2 deletions FWCore/MessageService/plugins/Module.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include "FWCore/PluginManager/interface/PresenceMacros.h"
#include "FWCore/MessageService/interface/MessageLogger.h"
#include "MessageLogger.h"
#include "FWCore/MessageService/interface/SingleThreadMSPresence.h"
#include "FWCore/ServiceRegistry/interface/ServiceMaker.h"

#pragma GCC visibility push(hidden)
using edm::service::MessageLogger;
using edm::service::SingleThreadMSPresence;
DEFINE_FWK_SERVICE(MessageLogger);

using MessageLoggerMaker = edm::serviceregistry::AllArgsMaker<edm::MessageLogger, MessageLogger>;
DEFINE_FWK_SERVICE_MAKER(MessageLogger, MessageLoggerMaker);

DEFINE_FWK_PRESENCE(SingleThreadMSPresence);
#pragma GCC visibility pop
2 changes: 0 additions & 2 deletions HLTrigger/special/plugins/HLTCountNumberOfObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include "FWCore/ParameterSet/interface/ParameterSet.h"

#include "FWCore/MessageService/interface/MessageLogger.h"

#include "HLTrigger/HLTcore/interface/HLTFilter.h"
#include "DataFormats/HLTReco/interface/TriggerFilterObjectWithRefs.h"
#include "HLTrigger/HLTcore/interface/defaultModuleLabel.h"
Expand Down
2 changes: 0 additions & 2 deletions HLTrigger/special/plugins/HLTTrackWithHits.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include "FWCore/ParameterSet/interface/ParameterSet.h"

#include "FWCore/MessageService/interface/MessageLogger.h"

#include "HLTrigger/HLTcore/interface/HLTFilter.h"
#include "DataFormats/HLTReco/interface/TriggerFilterObjectWithRefs.h"
#include "DataFormats/TrackReco/interface/Track.h"
Expand Down
1 change: 0 additions & 1 deletion L1Trigger/TrackFindingTMTT/plugins/TMTrackProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "L1Trigger/TrackFindingTMTT/interface/Array2D.h"
#include "L1Trigger/TrackFindingTMTT/interface/PrintL1trk.h"

#include "FWCore/MessageService/interface/MessageLogger.h"
#include "FWCore/Framework/interface/ESHandle.h"

#include <iostream>
Expand Down
Loading

0 comments on commit b9bc3c1

Please sign in to comment.