Skip to content

Commit

Permalink
Cache EventMetaData in GlobalEvFOutputModule
Browse files Browse the repository at this point in the history
- event meta data is cached at begin job or when new file is opened
- refactored StreamerOutputModuleCommon to be two classes so buffer
  could be handled separately for the caching
- added many unit tests to show GlobalEvFOutputModule properly
  writes the expected events and the data products are correct
  • Loading branch information
Dr15Jones committed May 9, 2024
1 parent 01b9d5d commit d9b3643
Show file tree
Hide file tree
Showing 18 changed files with 1,001 additions and 362 deletions.
2 changes: 1 addition & 1 deletion DQMServices/StreamerIO/plugins/DQMStreamerReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ namespace dqmservices {
} else {
//skipping
eview = getEventMsg();
assert( (eview==nullptr) or (not eview->isEventMetaData()));
assert((eview == nullptr) or (not eview->isEventMetaData()));
if (eview == nullptr) {
closeFileImp_("eof");
continue;
Expand Down
110 changes: 80 additions & 30 deletions EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "IOPool/Streamer/interface/StreamerOutputFile.h"
#include "FWCore/Framework/interface/global/OutputModule.h"

#include "IOPool/Streamer/interface/StreamerOutputModuleCommon.h"
#include "IOPool/Streamer/interface/StreamerOutputMsgBuilders.h"
#include "FWCore/Utilities/interface/EDGetToken.h"
#include "DataFormats/Streamer/interface/StreamedProducts.h"

Expand All @@ -42,13 +42,35 @@ namespace evf {

class FastMonitoringService;

struct MetaDataCache {
MetaDataCache(StreamerOutputMsgBuilders const& builders,
edm::BranchIDLists const& branchLists,
edm::ThinnedAssociationsHelper const helper)
: buffer_() {
auto ret = builders.serializeEventMetaData(buffer_, branchLists, helper);
builder_ = std::move(ret.first);
checksum_ = ret.second;
}
SerializeDataBuffer buffer_;
std::unique_ptr<EventMsgBuilder> builder_;
uint32_t checksum_;
};

class GlobalEvFOutputEventWriter {
public:
explicit GlobalEvFOutputEventWriter(std::string const& filePath, unsigned int ls)
: filePath_(filePath), ls_(ls), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}
explicit GlobalEvFOutputEventWriter(std::string const& filePath,
unsigned int ls,
std::shared_ptr<MetaDataCache const> iMetaData)
: filePath_(filePath),
ls_(ls),
accepted_(0),
stream_writer_events_(new StreamerOutputFile(filePath)),
meta_(std::move(iMetaData)) {}

~GlobalEvFOutputEventWriter() {}

void setMetaCache(std::shared_ptr<MetaDataCache const> iMetaData) { meta_ = std::move(iMetaData); }

bool close() {
stream_writer_events_->close();
return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
Expand All @@ -69,9 +91,14 @@ namespace evf {
return;
}
auto group = iHolder.group();
writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() mutable {
try {
std::unique_ptr<EventMsgBuilder> own(msg);
if (meta_) {
auto m = std::move(meta_);
assert(m->builder_);
doOutputEvent(*m->builder_);
}
doOutputEvent(*msg); //msg is written and discarded at this point
} catch (...) {
auto tmp = holder;
Expand Down Expand Up @@ -117,6 +144,7 @@ namespace evf {
const unsigned ls_;
std::atomic<unsigned long> accepted_;
edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
std::shared_ptr<MetaDataCache const> meta_;
edm::SerialTaskQueue writeQueue_;
bool discarded_ = false;
};
Expand Down Expand Up @@ -156,7 +184,8 @@ namespace evf {

typedef edm::global::OutputModule<edm::RunCache<GlobalEvFOutputJSONDef>,
edm::LuminosityBlockCache<evf::GlobalEvFOutputEventWriter>,
edm::StreamCache<StreamerOutputModuleCommon>,
edm::StreamCache<SerializeDataBuffer>,
edm::WatchInputFiles,
edm::ExternalWork>
GlobalEvFOutputModuleType;

Expand All @@ -167,7 +196,7 @@ namespace evf {
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

private:
std::unique_ptr<StreamerOutputModuleCommon> beginStream(edm::StreamID) const final;
std::unique_ptr<SerializeDataBuffer> beginStream(edm::StreamID) const final;

std::shared_ptr<GlobalEvFOutputJSONDef> globalBeginRun(edm::RunForOutput const& run) const final;

Expand All @@ -179,19 +208,31 @@ namespace evf {
void writeRun(edm::RunForOutput const&) final {}
void globalEndRun(edm::RunForOutput const&) const final {}

void respondToOpenInputFile(edm::FileBlock const&) final;
void respondToCloseInputFile(edm::FileBlock const&) final {}

void beginJob() final;
void cacheEventMetaData();

std::shared_ptr<GlobalEvFOutputEventWriter> globalBeginLuminosityBlock(
edm::LuminosityBlockForOutput const& iLB) const final;
void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const final;

Trig getTriggerResults(edm::EDGetTokenT<edm::TriggerResults> const& token, edm::EventForOutput const& e) const;

StreamerOutputModuleCommon::Parameters commonParameters_;
StreamerOutputMsgBuilders::Parameters commonParameters_;
std::unique_ptr<const StreamerOutputMsgBuilders> msgBuilders_;
std::string streamLabel_;
edm::EDGetTokenT<edm::TriggerResults> trToken_;
edm::EDGetTokenT<edm::SendJobHeader::ParameterSetMap> psetToken_;

evf::FastMonitoringService* fms_;

std::shared_ptr<MetaDataCache const> metaDataCache_;
//if a new file appears and has different meta data but the same lumi, we need
// to update the writer to write out the new meta data
mutable std::atomic<GlobalEvFOutputEventWriter*> lastWriter_ = nullptr;
unsigned int presentBranchIDListSize_ = 0;
}; //end-of-class-def

GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) {
Expand Down Expand Up @@ -287,7 +328,7 @@ namespace evf {
GlobalEvFOutputModule::GlobalEvFOutputModule(edm::ParameterSet const& ps)
: edm::global::OutputModuleBase(ps),
GlobalEvFOutputModuleType(ps),
commonParameters_(StreamerOutputModuleCommon::parameters(ps)),
commonParameters_(StreamerOutputMsgBuilders::parameters(ps)),
streamLabel_(ps.getParameter<std::string>("@module_label")),
trToken_(consumes<edm::TriggerResults>(edm::InputTag("TriggerResults"))),
psetToken_(consumes<edm::SendJobHeader::ParameterSetMap, edm::InRun>(
Expand Down Expand Up @@ -334,24 +375,21 @@ namespace evf {

void GlobalEvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
StreamerOutputModuleCommon::fillDescription(desc);
StreamerOutputMsgBuilders::fillDescription(desc);
GlobalEvFOutputModuleType::fillDescription(desc);
desc.addUntracked<edm::InputTag>("psetMap", {"hltPSetMap"})
->setComment("Optionally allow the map of ParameterSets to be calculated externally.");
descriptions.add("globalEvfOutputModule", desc);
}

std::unique_ptr<StreamerOutputModuleCommon> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
return std::make_unique<StreamerOutputModuleCommon>(
commonParameters_, &keptProducts()[edm::InEvent], description().moduleLabel());
std::unique_ptr<SerializeDataBuffer> GlobalEvFOutputModule::beginStream(edm::StreamID) const {
return std::make_unique<SerializeDataBuffer>();
}

std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
//create run Cache holding JSON file writer and variables
auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
jsonDef->updateDestination(streamLabel_);
StreamerOutputModuleCommon streamerCommon(
commonParameters_, &keptProducts()[edm::InEvent], description().moduleLabel());

//output INI file (non-const). This doesn't require globalBeginRun to be finished
const std::string openIniFileName = edm::Service<evf::EvFDaqDirector>()->getOpenInitFilePath(streamLabel_);
Expand All @@ -362,12 +400,13 @@ namespace evf {

auto psetMapHandle = run.getHandle(psetToken_);

SerializeDataBuffer buffer;
std::unique_ptr<InitMsgBuilder> init_message =
streamerCommon.serializeRegistry(*streamerCommon.getSerializerBuffer(),
OutputModule::processName(),
description().moduleLabel(),
moduleDescription().mainParameterSetID(),
psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);
msgBuilders_->serializeRegistry(buffer,
OutputModule::processName(),
description().moduleLabel(),
moduleDescription().mainParameterSetID(),
psetMapHandle.isValid() ? psetMapHandle.product() : nullptr);

//Let us turn it into a View
InitMsgView view(init_message->startAddress());
Expand Down Expand Up @@ -398,9 +437,6 @@ namespace evf {
}
src.close();

//clear serialization buffers
streamerCommon.getSerializerBuffer()->clearHeaderBuffer();

//free output buffer needed only for the file write
outBuf.reset();

Expand All @@ -427,26 +463,40 @@ namespace evf {
edm::LuminosityBlockForOutput const& iLB) const {
auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);

auto ret = std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock());
auto ret = std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock(), metaDataCache_);
lastWriter_ = ret.get();
return ret;
}

StreamerOutputModuleCommon streamerCommon(
void GlobalEvFOutputModule::beginJob() {
msgBuilders_ = std::make_unique<StreamerOutputMsgBuilders>(
commonParameters_, &keptProducts()[edm::InEvent], description().moduleLabel());

auto msg = streamerCommon.serializeEventMetaData(
*streamerCommon.getSerializerBuffer(), *branchIDLists(), *thinnedAssociationsHelper());
cacheEventMetaData();
}

ret->doOutputEvent(*msg);
return ret;
void GlobalEvFOutputModule::respondToOpenInputFile(edm::FileBlock const&) {
if (branchIDLists()->size() != presentBranchIDListSize_) {
cacheEventMetaData();
if (lastWriter_) {
lastWriter_.load()->setMetaCache(metaDataCache_);
}
}
}

void GlobalEvFOutputModule::cacheEventMetaData() {
metaDataCache_ = std::make_shared<MetaDataCache>(*msgBuilders_, *branchIDLists(), *thinnedAssociationsHelper());
}

void GlobalEvFOutputModule::acquire(edm::StreamID id,
edm::EventForOutput const& e,
edm::WaitingTaskWithArenaHolder iHolder) const {
edm::Handle<edm::TriggerResults> const& triggerResults = getTriggerResults(trToken_, e);

auto streamerCommon = streamCache(id);
std::cout << " writing Event " << moduleDescription().moduleLabel() << std::endl;
auto buffer = streamCache(id);
std::unique_ptr<EventMsgBuilder> msg =
streamerCommon->serializeEvent(*streamerCommon->getSerializerBuffer(), e, triggerResults, selectorConfig());
msgBuilders_->serializeEvent(*buffer, e, triggerResults, selectorConfig(), metaDataCache_->checksum_);

auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index());
const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)
Expand Down
1 change: 1 addition & 0 deletions EventFilter/Utilities/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
</library>

<test name="BUFU_TEST" command="RunBUFU.sh"/>
<test name="EventFilterUtilitiesReadStreamerFile" command="TestReadingStreamerFile.sh"/>
2 changes: 1 addition & 1 deletion EventFilter/Utilities/test/RunBUFU.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ cat data/run${runnumber}/run${runnumber}_ls0001_streamDQM_pid*.dat >> dqmdisk/ru
find dqmdisk
echo '{"data": [12950, 1620, 0, "run'${runnumber}'_ls0001_streamDQM_test.dat", 40823782, 1999348078, 135, 13150, 0, "Failsafe"]}' > dqmdisk/run${runnumber}/run${runnumber}_ls0001_streamDQM_test.jsn

CMDLINE_STARTDQM="cmsRun test_dqmstream.py runInputDir=./dqmdisk runNumber=100101"
CMDLINE_STARTDQM="cmsRun test_dqmstream.py runInputDir=./dqmdisk runNumber=100101 maxLS=1 eventsPerLS=35"
${CMDLINE_STARTDQM} > out_2_dqm.log 2>&1 || diedqm "${CMDLINE_STARTDQM}" $? $OUTDIR

rm -rf $OUTDIR/{ramdisk,data,*.log}
Expand Down
Loading

0 comments on commit d9b3643

Please sign in to comment.