From 3543dbaac2be1cc2e21451e100f24d1bc116cd8b Mon Sep 17 00:00:00 2001 From: Srecko Date: Fri, 2 Feb 2024 16:17:54 +0100 Subject: [PATCH] - remove EvFOutputModule which was replaced by GlobalEvFOutputModule - clean up unused functionality of EvFDaqDirector (reading PSets from the menu which was never used) --- .../Utilities/interface/EvFDaqDirector.h | 13 +- .../Utilities/interface/EvFOutputModule.h | 108 ------- EventFilter/Utilities/plugins/modules.cc | 2 - EventFilter/Utilities/src/EvFDaqDirector.cc | 155 +--------- EventFilter/Utilities/src/EvFOutputModule.cc | 289 ------------------ .../Utilities/src/FastMonitoringService.cc | 5 +- EventFilter/Utilities/test/startFU.py | 2 +- .../Utilities/test/start_multiLS_FU.py | 4 +- EventFilter/Utilities/test/unittest_FU.py | 2 +- .../Utilities/test/unittest_FU_daqsource.py | 6 +- 10 files changed, 13 insertions(+), 573 deletions(-) delete mode 100644 EventFilter/Utilities/interface/EvFOutputModule.h delete mode 100644 EventFilter/Utilities/src/EvFOutputModule.cc diff --git a/EventFilter/Utilities/interface/EvFDaqDirector.h b/EventFilter/Utilities/interface/EvFDaqDirector.h index 45886d16b0f83..18dbfa174d22b 100644 --- a/EventFilter/Utilities/interface/EvFDaqDirector.h +++ b/EventFilter/Utilities/interface/EvFDaqDirector.h @@ -68,7 +68,6 @@ namespace evf { void initRun(); static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); void preallocate(edm::service::SystemBounds const& bounds); - void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const&); void preBeginRun(edm::GlobalContext const& globalContext); void postEndRun(edm::GlobalContext const& globalContext); void preGlobalEndLumi(edm::GlobalContext const& globalContext); @@ -186,10 +185,8 @@ namespace evf { filesToDeletePtr_ = filesToDelete; } - void checkTransferSystemPSet(edm::ProcessContext const& pc); - void checkMergeTypePSet(edm::ProcessContext const& pc); - std::string getStreamDestinations(std::string const& stream) const; - std::string getStreamMergeType(std::string const& stream, MergeType defaultType); + std::string getStreamDestinations(std::string const&) const { return std::string(""); } + std::string getStreamMergeType(std::string const&, MergeType defaultType) const { return MergeTypeNames_[defaultType]; } static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid); bool inputThrottled(); bool lumisectionDiscarded(unsigned int ls); @@ -225,9 +222,6 @@ namespace evf { bool fileBrokerUseLocalLock_; unsigned int fuLockPollInterval_; bool outputAdler32Recheck_; - bool requireTSPSet_; - std::string selectedTransferMode_; - std::string mergeTypePset_; bool directorBU_; std::string hltSourceDirectory_; @@ -282,9 +276,6 @@ namespace evf { std::string stopFilePathPid_; unsigned int stop_ls_override_ = 0; - std::shared_ptr transferSystemJson_; - tbb::concurrent_hash_map mergeTypeMap_; - //values initialized in .cc file static const std::vector MergeTypeNames_; diff --git a/EventFilter/Utilities/interface/EvFOutputModule.h b/EventFilter/Utilities/interface/EvFOutputModule.h deleted file mode 100644 index e65f7eb5636d2..0000000000000 --- a/EventFilter/Utilities/interface/EvFOutputModule.h +++ /dev/null @@ -1,108 +0,0 @@ -#ifndef EventFilter_Utilities_EvFOutputModule_h -#define EventFilter_Utilities_EvFOutputModule_h - -#include "IOPool/Streamer/interface/StreamerOutputFile.h" -#include "FWCore/Framework/interface/one/OutputModule.h" -#include "IOPool/Streamer/interface/StreamerOutputModuleCommon.h" -#include "FWCore/Utilities/interface/EDGetToken.h" -#include "DataFormats/Streamer/interface/StreamedProducts.h" - -#include "EventFilter/Utilities/interface/JsonMonitorable.h" -#include "EventFilter/Utilities/interface/FastMonitor.h" -# -typedef edm::detail::TriggerResultsBasedEventSelector::handle_t Trig; - -namespace evf { - - class FastMonitoringService; - // class EvFOutputEventWriter; - - class EvFOutputEventWriter { - public: - explicit EvFOutputEventWriter(std::string const& filePath) - : filePath_(filePath), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {} - - ~EvFOutputEventWriter() {} - - void close() { stream_writer_events_->close(); } - - void doOutputEvent(EventMsgBuilder const& msg) { - EventMsgView eview(msg.startAddress()); - stream_writer_events_->write(eview); - } - - uint32 get_adler32() const { return stream_writer_events_->adler32(); } - - std::string const& getFilePath() const { return filePath_; } - - unsigned long getAccepted() const { return accepted_; } - void incAccepted() { accepted_++; } - - private: - std::string filePath_; - unsigned long accepted_; - edm::propagate_const> stream_writer_events_; - }; - - class EvFOutputJSONWriter { - public: - EvFOutputJSONWriter(edm::StreamerOutputModuleCommon::Parameters const& commonParameters, - edm::SelectedProducts const* selections, - std::string const& streamLabel, - std::string const& moduleLabel); - - edm::StreamerOutputModuleCommon streamerCommon_; - - jsoncollector::IntJ processed_; - jsoncollector::IntJ accepted_; - jsoncollector::IntJ errorEvents_; - jsoncollector::IntJ retCodeMask_; - jsoncollector::StringJ filelist_; - jsoncollector::IntJ filesize_; - jsoncollector::StringJ inputFiles_; - jsoncollector::IntJ fileAdler32_; - jsoncollector::StringJ transferDestination_; - jsoncollector::StringJ mergeType_; - jsoncollector::IntJ hltErrorEvents_; - std::shared_ptr jsonMonitor_; - jsoncollector::DataPointDefinition outJsonDef_; - }; - - typedef edm::one::OutputModule> - EvFOutputModuleType; - - class EvFOutputModule : public EvFOutputModuleType { - public: - explicit EvFOutputModule(edm::ParameterSet const& ps); - ~EvFOutputModule() override; - static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); - - private: - void beginRun(edm::RunForOutput const& run) override; - void write(edm::EventForOutput const& e) override; - - //pure in parent class but unused here - void writeLuminosityBlock(edm::LuminosityBlockForOutput const&) override {} - void writeRun(edm::RunForOutput const&) override {} - void endRun(edm::RunForOutput const&) override {} - - std::shared_ptr globalBeginLuminosityBlock( - edm::LuminosityBlockForOutput const& iLB) const override; - void globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) override; - - Trig getTriggerResults(edm::EDGetTokenT const& token, edm::EventForOutput const& e) const; - - edm::StreamerOutputModuleCommon::Parameters commonParameters_; - std::string streamLabel_; - edm::EDGetTokenT trToken_; - edm::EDGetTokenT psetToken_; - - evf::FastMonitoringService* fms_; - - std::unique_ptr jsonWriter_; - - }; //end-of-class-def - -} // namespace evf - -#endif diff --git a/EventFilter/Utilities/plugins/modules.cc b/EventFilter/Utilities/plugins/modules.cc index 106969f00a0b4..f0213d4fb2693 100644 --- a/EventFilter/Utilities/plugins/modules.cc +++ b/EventFilter/Utilities/plugins/modules.cc @@ -1,5 +1,4 @@ #include "EventFilter/Utilities/interface/EvFDaqDirector.h" -#include "EventFilter/Utilities/interface/EvFOutputModule.h" #include "EventFilter/Utilities/interface/FastMonitoringService.h" #include "EventFilter/Utilities/interface/FedRawDataInputSource.h" #include "EventFilter/Utilities/interface/DAQSource.h" @@ -29,7 +28,6 @@ DEFINE_FWK_SERVICE(EvFDaqDirector); DEFINE_FWK_MODULE(ExceptionGenerator); DEFINE_FWK_MODULE(EvFFEDSelector); DEFINE_FWK_MODULE(EvFFEDExcluder); -DEFINE_FWK_MODULE(EvFOutputModule); DEFINE_FWK_MODULE(DaqFakeReader); DEFINE_FWK_INPUT_SOURCE(FedRawDataInputSource); DEFINE_FWK_INPUT_SOURCE(DAQSource); diff --git a/EventFilter/Utilities/src/EvFDaqDirector.cc b/EventFilter/Utilities/src/EvFDaqDirector.cc index 91c7a4d46b866..78a285b814922 100644 --- a/EventFilter/Utilities/src/EvFDaqDirector.cc +++ b/EventFilter/Utilities/src/EvFDaqDirector.cc @@ -42,16 +42,13 @@ namespace evf { bu_base_dirs_all_(pset.getUntrackedParameter>("buBaseDirsAll")), run_(pset.getUntrackedParameter("runNumber")), useFileBroker_(pset.getUntrackedParameter("useFileBroker")), - fileBrokerHostFromCfg_(pset.getUntrackedParameter("fileBrokerHostFromCfg", true)), + fileBrokerHostFromCfg_(pset.getUntrackedParameter("fileBrokerHostFromCfg", false)), fileBrokerHost_(pset.getUntrackedParameter("fileBrokerHost", "InValid")), fileBrokerPort_(pset.getUntrackedParameter("fileBrokerPort", "8080")), fileBrokerKeepAlive_(pset.getUntrackedParameter("fileBrokerKeepAlive", true)), fileBrokerUseLocalLock_(pset.getUntrackedParameter("fileBrokerUseLocalLock", true)), fuLockPollInterval_(pset.getUntrackedParameter("fuLockPollInterval", 2000)), outputAdler32Recheck_(pset.getUntrackedParameter("outputAdler32Recheck", false)), - requireTSPSet_(pset.getUntrackedParameter("requireTransfersPSet", false)), - selectedTransferMode_(pset.getUntrackedParameter("selectedTransferMode", "")), - mergeTypePset_(pset.getUntrackedParameter("mergingPset", "")), directorBU_(pset.getUntrackedParameter("directorIsBU", false)), hltSourceDirectory_(pset.getUntrackedParameter("hltSourceDirectory", "")), hostname_(""), @@ -72,7 +69,6 @@ namespace evf { fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())), fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) { reg.watchPreallocate(this, &EvFDaqDirector::preallocate); - reg.watchPreBeginJob(this, &EvFDaqDirector::preBeginJob); reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun); reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun); reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi); @@ -385,10 +381,6 @@ namespace evf { ->setComment("Lock polling interval in microseconds for the input directory file lock"); desc.addUntracked("outputAdler32Recheck", false) ->setComment("Check Adler32 of per-process output files while micro-merging"); - desc.addUntracked("requireTransfersPSet", false) - ->setComment("Require complete transferSystem PSet in the process configuration"); - desc.addUntracked("selectedTransferMode", "") - ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter"); desc.addUntracked("directorIsBU", false)->setComment("BU director mode used for testing"); desc.addUntracked("hltSourceDirectory", "")->setComment("BU director mode source directory"); desc.addUntracked("mergingPset", "") @@ -396,11 +388,6 @@ namespace evf { descriptions.add("EvFDaqDirector", desc); } - void EvFDaqDirector::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc) { - checkTransferSystemPSet(pc); - checkMergeTypePSet(pc); - } - void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) { //assert(run_ == id.run()); @@ -1987,146 +1974,6 @@ namespace evf { } //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams - void EvFDaqDirector::checkTransferSystemPSet(edm::ProcessContext const& pc) { - if (transferSystemJson_) - return; - - transferSystemJson_.reset(new Json::Value); - edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID()); - if (topPset.existsAs("transferSystem", true)) { - const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem")); - - Json::Value destinationsVal(Json::arrayValue); - std::vector destinations = tsPset.getParameter>("destinations"); - for (auto& dest : destinations) - destinationsVal.append(dest); - (*transferSystemJson_)["destinations"] = destinationsVal; - - Json::Value modesVal(Json::arrayValue); - std::vector modes = tsPset.getParameter>("transferModes"); - for (auto& mode : modes) - modesVal.append(mode); - (*transferSystemJson_)["transferModes"] = modesVal; - - for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) { - if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") { - const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first); - Json::Value streamVal; - for (auto& mode : modes) { - //validation - if (!streamDef.existsAs>(mode, true)) - throw cms::Exception("EvFDaqDirector") - << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode - << ")"; - std::vector streamDestinations = streamDef.getParameter>(mode); - - Json::Value sDestsValue(Json::arrayValue); - - if (streamDestinations.empty()) - throw cms::Exception("EvFDaqDirector") - << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode; - - for (auto& sdest : streamDestinations) { - bool sDestValid = false; - sDestsValue.append(sdest); - for (auto& dest : destinations) { - if (dest == sdest) - sDestValid = true; - } - if (!sDestValid) - throw cms::Exception("EvFDaqDirector") - << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode - << ", dest:" << sdest; - } - streamVal[mode] = sDestsValue; - } - (*transferSystemJson_)[psKeyItr->first] = streamVal; - } - } - } else { - if (requireTSPSet_) - throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found"; - } - } - - std::string EvFDaqDirector::getStreamDestinations(std::string const& stream) const { - std::string streamRequestName; - if (transferSystemJson_->isMember(stream.c_str())) - streamRequestName = stream; - else { - std::stringstream msg; - msg << "Transfer system mode definitions missing for -: " << stream; - if (requireTSPSet_) - throw cms::Exception("EvFDaqDirector") << msg.str(); - else { - edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)"; - return std::string("Failsafe"); - } - } - //return empty if strict check parameter is not on - if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) { - edm::LogWarning("EvFDaqDirector") - << "Selected mode string is not provided as DaqDirector parameter." - << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string."; - return std::string("Failsafe"); - } - if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) { - throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter."; - } - //check if stream has properly listed transfer stream - if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) { - std::stringstream msg; - msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName; - if (requireTSPSet_) - throw cms::Exception("EvFDaqDirector") << msg.str(); - else - edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)"; - return std::string("Failsafe"); - } - Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, ""); - - //flatten string json::Array into CSV std::string - std::string ret; - for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) { - if (!ret.empty()) - ret += ","; - ret += (*it).asString(); - } - return ret; - } - - void EvFDaqDirector::checkMergeTypePSet(edm::ProcessContext const& pc) { - if (mergeTypePset_.empty()) - return; - if (!mergeTypeMap_.empty()) - return; - edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID()); - if (topPset.existsAs(mergeTypePset_, true)) { - const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_)); - for (const std::string& pname : tsPset.getParameterNames()) { - std::string streamType = tsPset.getParameter(pname); - tbb::concurrent_hash_map::accessor ac; - mergeTypeMap_.insert(ac, pname); - ac->second = streamType; - ac.release(); - } - } - } - - std::string EvFDaqDirector::getStreamMergeType(std::string const& stream, MergeType defaultType) { - tbb::concurrent_hash_map::const_accessor search_ac; - if (mergeTypeMap_.find(search_ac, stream)) - return search_ac->second; - - edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value"; - std::string defaultName = MergeTypeNames_[defaultType]; - tbb::concurrent_hash_map::accessor ac; - mergeTypeMap_.insert(ac, stream); - ac->second = defaultName; - ac.release(); - return defaultName; - } - void EvFDaqDirector::createProcessingNotificationMaybe() const { std::string proc_flag = run_dir_ + "/processing"; int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); diff --git a/EventFilter/Utilities/src/EvFOutputModule.cc b/EventFilter/Utilities/src/EvFOutputModule.cc deleted file mode 100644 index 56581c4864fe0..0000000000000 --- a/EventFilter/Utilities/src/EvFOutputModule.cc +++ /dev/null @@ -1,289 +0,0 @@ -#include "EventFilter/Utilities/interface/EvFOutputModule.h" - -#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" - -#include "FWCore/ServiceRegistry/interface/Service.h" -#include "EventFilter/Utilities/interface/FastMonitoringService.h" -#include "EventFilter/Utilities/interface/EvFDaqDirector.h" - -#include "EventFilter/Utilities/interface/JSONSerializer.h" -#include "EventFilter/Utilities/interface/FileIO.h" -#include "FWCore/Utilities/interface/Adler32Calculator.h" - -#include "FWCore/Framework/interface/EventForOutput.h" -#include "FWCore/Framework/interface/LuminosityBlockForOutput.h" -#include "FWCore/Framework/interface/LuminosityBlock.h" - -#include "IOPool/Streamer/interface/InitMsgBuilder.h" -#include "IOPool/Streamer/interface/EventMsgBuilder.h" -#include "FWCore/Utilities/interface/UnixSignalHandlers.h" - -#include -#include -#include - -namespace evf { - - EvFOutputJSONWriter::EvFOutputJSONWriter(edm::StreamerOutputModuleCommon::Parameters const& commonParameters, - edm::SelectedProducts const* selections, - std::string const& streamLabel, - std::string const& moduleLabel) - : streamerCommon_(commonParameters, selections, moduleLabel), - processed_(0), - accepted_(0), - errorEvents_(0), - retCodeMask_(0), - filelist_(), - filesize_(0), - inputFiles_(), - fileAdler32_(1), - hltErrorEvents_(0) { - transferDestination_ = edm::Service()->getStreamDestinations(streamLabel); - mergeType_ = edm::Service()->getStreamMergeType(streamLabel, evf::MergeTypeDAT); - - std::string baseRunDir = edm::Service()->baseRunDir(); - LogDebug("EvFOutputModule") << "writing .dat files to -: " << baseRunDir; - - edm::Service()->createRunOpendirMaybe(); - - processed_.setName("Processed"); - accepted_.setName("Accepted"); - errorEvents_.setName("ErrorEvents"); - retCodeMask_.setName("ReturnCodeMask"); - filelist_.setName("Filelist"); - filesize_.setName("Filesize"); - inputFiles_.setName("InputFiles"); - fileAdler32_.setName("FileAdler32"); - transferDestination_.setName("TransferDestination"); - mergeType_.setName("MergeType"); - hltErrorEvents_.setName("HLTErrorEvents"); - - outJsonDef_.setDefaultGroup("data"); - outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM); - outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM); - outJsonDef_.addLegendItem("ErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM); - outJsonDef_.addLegendItem("ReturnCodeMask", "integer", jsoncollector::DataPointDefinition::BINARYOR); - outJsonDef_.addLegendItem("Filelist", "string", jsoncollector::DataPointDefinition::MERGE); - outJsonDef_.addLegendItem("Filesize", "integer", jsoncollector::DataPointDefinition::SUM); - outJsonDef_.addLegendItem("InputFiles", "string", jsoncollector::DataPointDefinition::CAT); - outJsonDef_.addLegendItem("FileAdler32", "integer", jsoncollector::DataPointDefinition::ADLER32); - outJsonDef_.addLegendItem("TransferDestination", "string", jsoncollector::DataPointDefinition::SAME); - outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME); - outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM); - - std::stringstream tmpss, ss; - tmpss << baseRunDir << "/open/" - << "output_" << getpid() << ".jsd"; - ss << baseRunDir << "/" - << "output_" << getpid() << ".jsd"; - std::string outTmpJsonDefName = tmpss.str(); - std::string outJsonDefName = ss.str(); - - edm::Service()->lockInitLock(); - struct stat fstat; - if (stat(outJsonDefName.c_str(), &fstat) != 0) { //file does not exist - LogDebug("EvFOutputModule") << "writing output definition file -: " << outJsonDefName; - std::string content; - jsoncollector::JSONSerializer::serialize(&outJsonDef_, content); - jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content); - std::filesystem::rename(outTmpJsonDefName, outJsonDefName); - } - edm::Service()->unlockInitLock(); - - jsonMonitor_.reset(new jsoncollector::FastMonitor(&outJsonDef_, true)); - jsonMonitor_->setDefPath(outJsonDefName); - jsonMonitor_->registerGlobalMonitorable(&processed_, false); - jsonMonitor_->registerGlobalMonitorable(&accepted_, false); - jsonMonitor_->registerGlobalMonitorable(&errorEvents_, false); - jsonMonitor_->registerGlobalMonitorable(&retCodeMask_, false); - jsonMonitor_->registerGlobalMonitorable(&filelist_, false); - jsonMonitor_->registerGlobalMonitorable(&filesize_, false); - jsonMonitor_->registerGlobalMonitorable(&inputFiles_, false); - jsonMonitor_->registerGlobalMonitorable(&fileAdler32_, false); - jsonMonitor_->registerGlobalMonitorable(&transferDestination_, false); - jsonMonitor_->registerGlobalMonitorable(&mergeType_, false); - jsonMonitor_->registerGlobalMonitorable(&hltErrorEvents_, false); - jsonMonitor_->commit(nullptr); - } - - EvFOutputModule::EvFOutputModule(edm::ParameterSet const& ps) - : edm::one::OutputModuleBase(ps), - EvFOutputModuleType(ps), - commonParameters_(edm::StreamerOutputModuleCommon::parameters(ps)), - streamLabel_(ps.getParameter("@module_label")), - trToken_(consumes(edm::InputTag("TriggerResults"))), - psetToken_(consumes( - ps.getUntrackedParameter("psetMap"))) { - //replace hltOutoputA with stream if the HLT menu uses this convention - std::string testPrefix = "hltOutput"; - if (streamLabel_.find(testPrefix) == 0) - streamLabel_ = std::string("stream") + streamLabel_.substr(testPrefix.size()); - - if (streamLabel_.find('_') != std::string::npos) { - throw cms::Exception("EvFOutputModule") << "Underscore character is reserved can not be used for stream names in " - "FFF, but was detected in stream name -: " - << streamLabel_; - } - - std::string streamLabelLow = streamLabel_; - boost::algorithm::to_lower(streamLabelLow); - auto streampos = streamLabelLow.rfind("stream"); - if (streampos != 0 && streampos != std::string::npos) - throw cms::Exception("EvFOutputModule") - << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for " - "names in FFF based HLT, but was detected in stream name"; - - fms_ = (evf::FastMonitoringService*)(edm::Service().operator->()); - } - - EvFOutputModule::~EvFOutputModule() {} - - void EvFOutputModule::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { - edm::ParameterSetDescription desc; - edm::StreamerOutputModuleCommon::fillDescription(desc); - EvFOutputModuleType::fillDescription(desc); - desc.addUntracked("psetMap", {"hltPSetMap"}) - ->setComment("Optionally allow the map of ParameterSets to be calculated externally."); - descriptions.add("evfOutputModule", desc); - } - - void EvFOutputModule::beginRun(edm::RunForOutput const& run) { - //create run Cache holding JSON file writer and variables - jsonWriter_ = std::make_unique( - commonParameters_, &keptProducts()[edm::InEvent], streamLabel_, description().moduleLabel()); - - //output INI file (non-const). This doesn't require globalBeginRun to be finished - const std::string openIniFileName = edm::Service()->getOpenInitFilePath(streamLabel_); - edm::LogInfo("EvFOutputModule") << "beginRun init stream -: " << openIniFileName; - - StreamerOutputFile stream_writer_preamble(openIniFileName); - uint32 preamble_adler32 = 1; - edm::BranchIDLists const* bidlPtr = branchIDLists(); - - auto psetMapHandle = run.getHandle(psetToken_); - - std::unique_ptr init_message = - jsonWriter_->streamerCommon_.serializeRegistry(*jsonWriter_->streamerCommon_.getSerializerBuffer(), - *bidlPtr, - *thinnedAssociationsHelper(), - OutputModule::processName(), - description().moduleLabel(), - moduleDescription().mainParameterSetID(), - psetMapHandle.isValid() ? psetMapHandle.product() : nullptr); - - //Let us turn it into a View - InitMsgView view(init_message->startAddress()); - - //output header - stream_writer_preamble.write(view); - preamble_adler32 = stream_writer_preamble.adler32(); - stream_writer_preamble.close(); - - struct stat istat; - stat(openIniFileName.c_str(), &istat); - //read back file to check integrity of what was written - off_t readInput = 0; - uint32_t adlera = 1, adlerb = 0; - FILE* src = fopen(openIniFileName.c_str(), "r"); - - //allocate buffer to write INI file - unsigned char* outBuf = new unsigned char[1024 * 1024]; - while (readInput < istat.st_size) { - size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput; - fread(outBuf, toRead, 1, src); - cms::Adler32((const char*)outBuf, toRead, adlera, adlerb); - readInput += toRead; - } - fclose(src); - - //clear serialization buffers - jsonWriter_->streamerCommon_.getSerializerBuffer()->clearHeaderBuffer(); - - //free output buffer needed only for the file write - delete[] outBuf; - outBuf = nullptr; - - uint32_t adler32c = (adlerb << 16) | adlera; - if (adler32c != preamble_adler32) { - throw cms::Exception("EvFOutputModule") << "Checksum mismatch of ini file -: " << openIniFileName - << " expected:" << preamble_adler32 << " obtained:" << adler32c; - } else { - LogDebug("EvFOutputModule") << "Ini file checksum -: " << streamLabel_ << " " << adler32c; - std::filesystem::rename(openIniFileName, edm::Service()->getInitFilePath(streamLabel_)); - } - } - - Trig EvFOutputModule::getTriggerResults(edm::EDGetTokenT const& token, - edm::EventForOutput const& e) const { - Trig result; - e.getByToken(token, result); - return result; - } - - std::shared_ptr EvFOutputModule::globalBeginLuminosityBlock( - edm::LuminosityBlockForOutput const& iLB) const { - auto openDatFilePath = edm::Service()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_); - - return std::make_shared(openDatFilePath); - } - - void EvFOutputModule::write(edm::EventForOutput const& e) { - unsigned int counter = 0; - while (edm::Service()->inputThrottled()) { - if (edm::shutdown_flag.load(std::memory_order_relaxed)) - break; - if (!(counter % 100)) - edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused..."; - usleep(100000); - counter++; - } - - edm::Handle const& triggerResults = getTriggerResults(trToken_, e); - - //auto lumiWriter = const_cast(luminosityBlockCache(e.getLuminosityBlock().index() )); - auto lumiWriter = luminosityBlockCache(e.getLuminosityBlock().index()); - std::unique_ptr msg = jsonWriter_->streamerCommon_.serializeEvent( - *jsonWriter_->streamerCommon_.getSerializerBuffer(), e, triggerResults, selectorConfig()); - lumiWriter->incAccepted(); - lumiWriter->doOutputEvent(*msg); //msg is written and discarded at this point - } - - void EvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) { - auto lumiWriter = luminosityBlockCache(iLB.index()); - //close dat file - lumiWriter->close(); - - jsonWriter_->fileAdler32_.value() = lumiWriter->get_adler32(); - jsonWriter_->accepted_.value() = lumiWriter->getAccepted(); - - bool abortFlag = false; - jsonWriter_->processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag); - if (abortFlag) { - edm::LogInfo("EvFOutputModule") << "Abort flag has been set. Output is suppressed"; - return; - } - - if (jsonWriter_->processed_.value() != 0) { - struct stat istat; - std::filesystem::path openDatFilePath = lumiWriter->getFilePath(); - stat(openDatFilePath.string().c_str(), &istat); - jsonWriter_->filesize_ = istat.st_size; - std::filesystem::rename(openDatFilePath.string().c_str(), - edm::Service()->getDatFilePath(iLB.luminosityBlock(), streamLabel_)); - jsonWriter_->filelist_ = openDatFilePath.filename().string(); - } else { - //remove empty file when no event processing has occurred - remove(lumiWriter->getFilePath().c_str()); - jsonWriter_->filesize_ = 0; - jsonWriter_->filelist_ = ""; - jsonWriter_->fileAdler32_.value() = -1; //no files in signed long - } - - //produce JSON file - jsonWriter_->jsonMonitor_->snap(iLB.luminosityBlock()); - const std::string outputJsonNameStream = - edm::Service()->getOutputJsonFilePath(iLB.luminosityBlock(), streamLabel_); - jsonWriter_->jsonMonitor_->outputFullJSON(outputJsonNameStream, iLB.luminosityBlock()); - } - -} // namespace evf diff --git a/EventFilter/Utilities/src/FastMonitoringService.cc b/EventFilter/Utilities/src/FastMonitoringService.cc index 0cf41da56bc00..dc892bac9a955 100644 --- a/EventFilter/Utilities/src/FastMonitoringService.cc +++ b/EventFilter/Utilities/src/FastMonitoringService.cc @@ -457,8 +457,9 @@ namespace evf { //build a map of modules keyed by their module description address //here we need to treat output modules in a special way so they can be easily singled out - if (desc.moduleName() == "Stream" || desc.moduleName() == "GlobalEvFOutputModule" || - desc.moduleName() == "EvFOutputModule" || desc.moduleName() == "EventStreamFileWriter" || + if (desc.moduleName() == "Stream" || + desc.moduleName() == "GlobalEvFOutputModule" || + desc.moduleName() == "EventStreamFileWriter" || desc.moduleName() == "PoolOutputModule") { fmt_->m_data.encModule_.updateReserved((void*)&desc); nOutputModules_++; diff --git a/EventFilter/Utilities/test/startFU.py b/EventFilter/Utilities/test/startFU.py index 7a72c69fc6a35..870fe38d0542d 100644 --- a/EventFilter/Utilities/test/startFU.py +++ b/EventFilter/Utilities/test/startFU.py @@ -127,7 +127,7 @@ process.HLT_Physics = cms.Path(process.a*process.tcdsRawToDigi*process.filter1) process.HLT_Muon = cms.Path(process.b*process.filter2) -process.streamA = cms.OutputModule("EvFOutputModule", +process.streamA = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'HLT_Physics' )) ) diff --git a/EventFilter/Utilities/test/start_multiLS_FU.py b/EventFilter/Utilities/test/start_multiLS_FU.py index 1a81e5e00ee27..f50736f2d80c6 100644 --- a/EventFilter/Utilities/test/start_multiLS_FU.py +++ b/EventFilter/Utilities/test/start_multiLS_FU.py @@ -123,11 +123,11 @@ process.p1 = cms.Path(process.a*process.filter1) process.p2 = cms.Path(process.b*process.filter2) -process.streamA = cms.OutputModule("EvFOutputModule", +process.streamA = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p1' )) ) -process.streamB = cms.OutputModule("EvFOutputModule", +process.streamB = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' )) ) diff --git a/EventFilter/Utilities/test/unittest_FU.py b/EventFilter/Utilities/test/unittest_FU.py index 91a661228dbac..47fb7cb4db92a 100644 --- a/EventFilter/Utilities/test/unittest_FU.py +++ b/EventFilter/Utilities/test/unittest_FU.py @@ -138,7 +138,7 @@ process.HLT_Physics = cms.Path(process.a*process.tcdsRawToDigi*process.filter1) process.HLT_Muon = cms.Path(process.b*process.filter2) -process.streamA = cms.OutputModule("EvFOutputModule", +process.streamA = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'HLT_Physics' )) ) diff --git a/EventFilter/Utilities/test/unittest_FU_daqsource.py b/EventFilter/Utilities/test/unittest_FU_daqsource.py index afbd801029eef..84ae29bfd8fd6 100644 --- a/EventFilter/Utilities/test/unittest_FU_daqsource.py +++ b/EventFilter/Utilities/test/unittest_FU_daqsource.py @@ -145,15 +145,15 @@ process.p1 = cms.Path(process.a*process.tcdsRawToDigi*process.filter1) process.p2 = cms.Path(process.b*process.filter2) -process.streamA = cms.OutputModule("EvFOutputModule", +process.streamA = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p1' )) ) -process.streamB = cms.OutputModule("EvFOutputModule", +process.streamB = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' )) ) -process.streamC = cms.OutputModule("EvFOutputModule", +process.streamC = cms.OutputModule("GlobalEvFOutputModule", SelectEvents = cms.untracked.PSet(SelectEvents = cms.vstring( 'p2' )) )