Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add globalBeginJob() interface for stream modules using GlobalCache #29585

Merged
merged 1 commit into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions FWCore/Framework/interface/stream/EDAnalyzerAdaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ namespace edm {
m_lumiSummaries.resize(iNLumis);
}

void doBeginJob() final { MyGlobal::beginJob(m_global.get()); }
void doEndJob() final { MyGlobal::endJob(m_global.get()); }
void setupRun(EDAnalyzerBase* iProd, RunIndex iIndex) final { MyGlobalRun::set(iProd, m_runs[iIndex].get()); }
void streamEndRunSummary(EDAnalyzerBase* iProd, edm::Run const& iRun, edm::EventSetup const& iES) final {
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/stream/EDAnalyzerAdaptorBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ namespace edm {
Principal const& iPrincipal) const {}

virtual void setupStreamModules() = 0;
void doBeginJob();
virtual void doBeginJob() = 0;
virtual void doEndJob() = 0;

void doBeginStream(StreamID id);
Expand Down
1 change: 1 addition & 0 deletions FWCore/Framework/interface/stream/ProducingModuleAdaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ namespace edm {
m_lumis.resize(iNLumis);
m_lumiSummaries.resize(iNLumis);
}
void doBeginJob() final { MyGlobal::beginJob(m_global.get()); }
void doEndJob() final { MyGlobal::endJob(m_global.get()); }
void setupRun(M* iProd, RunIndex iIndex) final { MyGlobalRun::set(iProd, m_runs[iIndex].get()); }
void streamEndRunSummary(M* iProd, edm::Run const& iRun, edm::EventSetup const& iES) final {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ namespace edm {
void doPreallocate(PreallocationConfiguration const&);
virtual void preallocLumis(unsigned int) {}
virtual void setupStreamModules() = 0;
void doBeginJob();
virtual void doBeginJob() = 0;
virtual void doEndJob() = 0;

void doBeginStream(StreamID id);
Expand Down
14 changes: 14 additions & 0 deletions FWCore/Framework/interface/stream/callAbilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

// system include files
#include <memory>
#include <type_traits>
// user include files
#include "FWCore/Framework/interface/stream/dummy_helpers.h"

Expand All @@ -32,17 +33,30 @@ namespace edm {
//********************************
// CallGlobal
//********************************
namespace callGlobalDetail {
template <typename, typename = std::void_t<>>
struct has_globalBeginJob : std::false_type {};

template <typename T>
struct has_globalBeginJob<T, std::void_t<decltype(T::globalBeginJob(nullptr))>> : std::true_type {};
} // namespace callGlobalDetail
template <typename T, bool>
struct CallGlobalImpl {
template <typename B>
static void set(B* iProd, typename T::GlobalCache const* iCache) {
static_cast<T*>(iProd)->setGlobalCache(iCache);
}
static void beginJob(typename T::GlobalCache* iCache) {
if constexpr (callGlobalDetail::has_globalBeginJob<T>::value) {
T::globalBeginJob(iCache);
}
}
static void endJob(typename T::GlobalCache* iCache) { T::globalEndJob(iCache); }
};
template <typename T>
struct CallGlobalImpl<T, false> {
static void set(void* iProd, void const* iCache) {}
static void beginJob(void* iCache) {}
static void endJob(void* iCache) {}
};

Expand Down
1 change: 0 additions & 1 deletion FWCore/Framework/src/stream/EDAnalyzerAdaptorBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ bool EDAnalyzerAdaptorBase::doEvent(EventPrincipal const& ep,
mod->analyze(e, c);
return true;
}
void EDAnalyzerAdaptorBase::doBeginJob() {}

void EDAnalyzerAdaptorBase::doBeginStream(StreamID id) { m_streamModules[id]->beginStream(id); }
void EDAnalyzerAdaptorBase::doEndStream(StreamID id) { m_streamModules[id]->endStream(); }
Expand Down
3 changes: 0 additions & 3 deletions FWCore/Framework/src/stream/ProducingModuleAdaptorBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ namespace edm {
return m_streamModules[0]->indiciesForPutProducts(iBranchType);
}

template <typename T>
void ProducingModuleAdaptorBase<T>::doBeginJob() {}

template <typename T>
void ProducingModuleAdaptorBase<T>::doBeginStream(StreamID id) {
m_streamModules[id]->beginStream(id);
Expand Down
38 changes: 37 additions & 1 deletion FWCore/Framework/test/stream_filter_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,36 @@ class testStreamFilter : public CppUnit::TestFixture {
++m_count;
}
};
class GlobalProdWithBeginJob : public edm::stream::EDFilter<edm::GlobalCache<int>> {
public:
static unsigned int m_count;

static std::unique_ptr<int> initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique<int>(1); }
GlobalProdWithBeginJob(edm::ParameterSet const&, const int* iGlobal) { CPPUNIT_ASSERT(*iGlobal == 1); }

static void globalBeginJob(int* iGlobal) {
CPPUNIT_ASSERT(1 == *iGlobal);
*iGlobal = 2;
++m_count;
}

void beginStream(edm::StreamID) override {
int* iGlobal = const_cast<int*>(globalCache());
CPPUNIT_ASSERT(2 == *iGlobal);
*iGlobal = 3;
++m_count;
}

bool filter(edm::Event&, edm::EventSetup const&) override {
++m_count;
return true;
}

static void globalEndJob(int* iGlobal) {
CPPUNIT_ASSERT(3 == *iGlobal);
++m_count;
}
};
class RunProd : public edm::stream::EDFilter<edm::RunCache<int>> {
public:
static unsigned int m_count;
Expand Down Expand Up @@ -356,6 +386,7 @@ class testStreamFilter : public CppUnit::TestFixture {
};
unsigned int testStreamFilter::BasicProd::m_count = 0;
unsigned int testStreamFilter::GlobalProd::m_count = 0;
unsigned int testStreamFilter::GlobalProdWithBeginJob::m_count = 0;
unsigned int testStreamFilter::RunProd::m_count = 0;
unsigned int testStreamFilter::LumiProd::m_count = 0;
unsigned int testStreamFilter::RunSummaryProd::m_count = 0;
Expand Down Expand Up @@ -422,6 +453,7 @@ testStreamFilter::testStreamFilter()
m_actReg.reset(new edm::ActivityRegistry);

//For each transition, bind a lambda which will call the proper method of the Worker
m_transToFunc[Trans::kBeginJob] = [](edm::Worker* iBase) { iBase->beginJob(); };
m_transToFunc[Trans::kBeginStream] = [](edm::Worker* iBase) {
edm::StreamContext streamContext(s_streamID0, nullptr);
iBase->beginStream(s_streamID0, streamContext);
Expand Down Expand Up @@ -483,6 +515,7 @@ testStreamFilter::testStreamFilter()
edm::StreamContext streamContext(s_streamID0, nullptr);
iBase->endStream(s_streamID0, streamContext);
};
m_transToFunc[Trans::kEndJob] = [](edm::Worker* iBase) { iBase->endJob(); };
}

namespace {
Expand Down Expand Up @@ -529,7 +562,10 @@ void testStreamFilter::runTest(Expectations const& iExpect) {

void testStreamFilter::basicTest() { runTest<BasicProd>({Trans::kEvent}); }

void testStreamFilter::globalTest() { runTest<GlobalProd>({Trans::kBeginJob, Trans::kEvent, Trans::kEndJob}); }
void testStreamFilter::globalTest() {
runTest<GlobalProd>({Trans::kEvent, Trans::kEndJob});
runTest<GlobalProdWithBeginJob>({Trans::kBeginJob, Trans::kBeginStream, Trans::kEvent, Trans::kEndJob});
}

void testStreamFilter::runTest() { runTest<RunProd>({Trans::kGlobalBeginRun, Trans::kEvent, Trans::kGlobalEndRun}); }

Expand Down
35 changes: 34 additions & 1 deletion FWCore/Framework/test/stream_producer_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,33 @@ class testStreamProducer : public CppUnit::TestFixture {
++m_count;
}
};
class GlobalProdWithBeginJob : public edm::stream::EDProducer<edm::GlobalCache<int>> {
public:
static unsigned int m_count;

static std::unique_ptr<int> initializeGlobalCache(edm::ParameterSet const&) { return std::make_unique<int>(1); }
GlobalProdWithBeginJob(edm::ParameterSet const&, const int* iGlobal) { CPPUNIT_ASSERT(*iGlobal == 1); }

static void globalBeginJob(int* iGlobal) {
CPPUNIT_ASSERT(1 == *iGlobal);
*iGlobal = 2;
++m_count;
}

void beginStream(edm::StreamID) override {
int* iGlobal = const_cast<int*>(globalCache());
CPPUNIT_ASSERT(2 == *iGlobal);
*iGlobal = 3;
++m_count;
}

void produce(edm::Event&, edm::EventSetup const&) override { ++m_count; }

static void globalEndJob(int* iGlobal) {
CPPUNIT_ASSERT(3 == *iGlobal);
++m_count;
}
};
class RunProd : public edm::stream::EDProducer<edm::RunCache<int>> {
public:
static unsigned int m_count;
Expand Down Expand Up @@ -320,6 +347,7 @@ class testStreamProducer : public CppUnit::TestFixture {
};
unsigned int testStreamProducer::BasicProd::m_count = 0;
unsigned int testStreamProducer::GlobalProd::m_count = 0;
unsigned int testStreamProducer::GlobalProdWithBeginJob::m_count = 0;
unsigned int testStreamProducer::RunProd::m_count = 0;
unsigned int testStreamProducer::LumiProd::m_count = 0;
unsigned int testStreamProducer::RunSummaryProd::m_count = 0;
Expand Down Expand Up @@ -386,6 +414,7 @@ testStreamProducer::testStreamProducer()
m_actReg.reset(new edm::ActivityRegistry);

//For each transition, bind a lambda which will call the proper method of the Worker
m_transToFunc[Trans::kBeginJob] = [](edm::Worker* iBase) { iBase->beginJob(); };
m_transToFunc[Trans::kBeginStream] = [](edm::Worker* iBase) {
edm::StreamContext streamContext(s_streamID0, nullptr);
iBase->beginStream(s_streamID0, streamContext);
Expand Down Expand Up @@ -447,6 +476,7 @@ testStreamProducer::testStreamProducer()
edm::StreamContext streamContext(s_streamID0, nullptr);
iBase->endStream(s_streamID0, streamContext);
};
m_transToFunc[Trans::kEndJob] = [](edm::Worker* iBase) { iBase->endJob(); };
}

namespace {
Expand Down Expand Up @@ -493,7 +523,10 @@ void testStreamProducer::runTest(Expectations const& iExpect) {

void testStreamProducer::basicTest() { runTest<BasicProd>({Trans::kEvent}); }

void testStreamProducer::globalTest() { runTest<GlobalProd>({Trans::kBeginJob, Trans::kEvent, Trans::kEndJob}); }
void testStreamProducer::globalTest() {
runTest<GlobalProd>({Trans::kEvent, Trans::kEndJob});
runTest<GlobalProdWithBeginJob>({Trans::kBeginJob, Trans::kBeginStream, Trans::kEvent, Trans::kEndJob});
}

void testStreamProducer::runTest() { runTest<RunProd>({Trans::kGlobalBeginRun, Trans::kEvent, Trans::kGlobalEndRun}); }

Expand Down
7 changes: 7 additions & 0 deletions FWCore/Framework/test/stubs/TestStreamAnalyzers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ namespace edmtest {
});
}

static void globalBeginJob(Cache* iGlobal) {
++m_count;
if (iGlobal->value != 0) {
throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
}
}

void analyze(edm::Event const&, edm::EventSetup const&) {
++m_count;
++((globalCache())->value);
Expand Down
7 changes: 7 additions & 0 deletions FWCore/Framework/test/stubs/TestStreamFilters.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ namespace edmtest {
produces<unsigned int>();
}

static void globalBeginJob(Cache* iGlobal) {
++m_count;
if (iGlobal->value != 0) {
throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
}
}

bool filter(edm::Event&, edm::EventSetup const&) override {
++m_count;
++((globalCache())->value);
Expand Down
7 changes: 7 additions & 0 deletions FWCore/Framework/test/stubs/TestStreamProducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ namespace edmtest {
produces<unsigned int>();
}

static void globalBeginJob(Cache* iGlobal) {
++m_count;
if (iGlobal->value != 0) {
throw cms::Exception("cache value") << iGlobal->value << " but it was supposed to be 0";
}
}

void produce(edm::Event&, edm::EventSetup const&) override {
++m_count;
++((globalCache())->value);
Expand Down
6 changes: 3 additions & 3 deletions FWCore/Framework/test/test_stream_modules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


process.GlobIntProd = cms.EDProducer("edmtest::stream::GlobalIntProducer",
transitions = cms.int32(nEvt+2)
transitions = cms.int32(nEvt+3)
,cachevalue = cms.int32(nEvt)
)

Expand Down Expand Up @@ -75,7 +75,7 @@


process.GlobIntAn = cms.EDAnalyzer("edmtest::stream::GlobalIntAnalyzer",
transitions = cms.int32(nEvt+2)
transitions = cms.int32(nEvt+3)
,cachevalue = cms.int32(nEvt)
)

Expand All @@ -100,7 +100,7 @@
)

process.GlobIntFil = cms.EDFilter("edmtest::stream::GlobalIntFilter",
transitions = cms.int32(nEvt+2)
transitions = cms.int32(nEvt+3)
,cachevalue = cms.int32(nEvt)
)

Expand Down