diff --git a/DQMServices/FileIO/plugins/DQMFileSaverPB.cc b/DQMServices/FileIO/plugins/DQMFileSaverPB.cc index caf067eb07014..5e0f0d77fa3e2 100644 --- a/DQMServices/FileIO/plugins/DQMFileSaverPB.cc +++ b/DQMServices/FileIO/plugins/DQMFileSaverPB.cc @@ -43,6 +43,17 @@ DQMFileSaverPB::DQMFileSaverPB(const edm::ParameterSet& ps) : DQMFileSaverBase(p if (tag_ != "UNKNOWN") { streamLabel_ = "DQMLive"; } + + if (!fakeFilterUnitMode_) { + if (!edm::Service().isAvailable()) + throw cms::Exception("DQMFileSaverPB") << "EvFDaqDirector is not available"; + std::string initFileName = edm::Service()->getInitFilePath(streamLabel_); + std::ofstream file(initFileName); + if (!file) + throw cms::Exception("DQMFileSaverPB") + << "Cannot create INI file: " << initFileName << " error: " << strerror(errno); + file.close(); + } } DQMFileSaverPB::~DQMFileSaverPB() = default; @@ -52,13 +63,6 @@ void DQMFileSaverPB::initRun() const { transferDestination_ = edm::Service()->getStreamDestinations(streamLabel_); mergeType_ = edm::Service()->getStreamMergeType(streamLabel_, evf::MergeTypePB); } - - if (!fakeFilterUnitMode_) { - evf::EvFDaqDirector* daqDirector = (evf::EvFDaqDirector*)(edm::Service().operator->()); - const std::string initFileName = daqDirector->getInitFilePath(streamLabel_); - std::ofstream file(initFileName); - file.close(); - } } void DQMFileSaverPB::saveLumi(const FileParameters& fp) const { diff --git a/EventFilter/Utilities/interface/EvFDaqDirector.h b/EventFilter/Utilities/interface/EvFDaqDirector.h index a8b88de5cd40a..3aa2a8b5921d5 100644 --- a/EventFilter/Utilities/interface/EvFDaqDirector.h +++ b/EventFilter/Utilities/interface/EvFDaqDirector.h @@ -91,6 +91,7 @@ namespace evf { std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const; std::string getOpenInitFilePath(std::string const& stream) const; std::string getInitFilePath(std::string const& stream) const; + std::string getInitTempFilePath(std::string const& stream) const; std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const; std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const; std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const; @@ -120,6 +121,7 @@ namespace evf { void unlockInitLock(); void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; } bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; } + unsigned int numConcurrentLumis() const { return nConcurrentLumis_; } void lockFULocal(); void unlockFULocal(); void lockFULocal2(); @@ -185,6 +187,7 @@ namespace evf { std::string getStreamMergeType(std::string const& stream, MergeType defaultType); static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid); bool inputThrottled(); + bool lumisectionDiscarded(unsigned int ls); private: bool bumpFile(unsigned int& ls, @@ -263,6 +266,7 @@ namespace evf { unsigned int nStreams_ = 0; unsigned int nThreads_ = 0; + unsigned int nConcurrentLumis_ = 0; bool readEolsDefinition_ = true; unsigned int eolsNFilesIndex_ = 1; @@ -286,6 +290,7 @@ namespace evf { std::unique_ptr socket_; std::string input_throttled_file_; + std::string discard_ls_filestem_; }; } // namespace evf diff --git a/EventFilter/Utilities/interface/FFFNamingSchema.h b/EventFilter/Utilities/interface/FFFNamingSchema.h index bb3d66d0450c4..d681ab6f3ddb1 100644 --- a/EventFilter/Utilities/interface/FFFNamingSchema.h +++ b/EventFilter/Utilities/interface/FFFNamingSchema.h @@ -59,6 +59,13 @@ namespace fffnaming { return ss.str(); } + inline std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const& stream) { + std::stringstream ss; + runLumiPrefixFill(ss, run, ls); + ss << "_" << stream << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".initemp"; + return ss.str(); + } + inline std::string initFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const& stream, diff --git a/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc b/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc index 14875e69f6849..48e38233cefc2 100644 --- a/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc +++ b/EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc @@ -43,12 +43,15 @@ namespace evf { class GlobalEvFOutputEventWriter { public: - explicit GlobalEvFOutputEventWriter(std::string const& filePath) - : filePath_(filePath), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {} + explicit GlobalEvFOutputEventWriter(std::string const& filePath, unsigned int ls) + : filePath_(filePath), ls_(ls), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {} ~GlobalEvFOutputEventWriter() {} - void close() { stream_writer_events_->close(); } + bool close() { + stream_writer_events_->close(); + return (discarded_ || edm::Service()->lumisectionDiscarded(ls_)); + } void doOutputEvent(EventMsgBuilder const& msg) { EventMsgView eview(msg.startAddress()); @@ -58,6 +61,12 @@ namespace evf { void doOutputEventAsync(std::unique_ptr msg, edm::WaitingTaskHolder iHolder) { throttledCheck(); + discardedCheck(); + if (discarded_) { + incAccepted(); + msg.reset(); + return; + } auto group = iHolder.group(); writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() { try { @@ -72,13 +81,24 @@ namespace evf { inline void throttledCheck() { unsigned int counter = 0; - while (edm::Service()->inputThrottled()) { + while (edm::Service()->inputThrottled() && !discarded_) { if (edm::shutdown_flag.load(std::memory_order_relaxed)) break; if (!(counter % 100)) edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused..."; usleep(100000); counter++; + if (edm::Service()->lumisectionDiscarded(ls_)) { + edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_; + discarded_ = true; + } + } + } + + inline void discardedCheck() { + if (!discarded_ && edm::Service()->lumisectionDiscarded(ls_)) { + edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_; + discarded_ = true; } } @@ -93,14 +113,17 @@ namespace evf { private: std::string filePath_; + const unsigned ls_; std::atomic accepted_; edm::propagate_const> stream_writer_events_; edm::SerialTaskQueue writeQueue_; + bool discarded_ = false; }; class GlobalEvFOutputJSONDef { public: - GlobalEvFOutputJSONDef(std::string const& streamLabel); + GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd); + void updateDestination(std::string const& streamLabel); jsoncollector::DataPointDefinition outJsonDef_; std::string outJsonDefName_; @@ -170,12 +193,10 @@ namespace evf { }; //end-of-class-def - GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel) { + GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) { std::string baseRunDir = edm::Service()->baseRunDir(); LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir; - edm::Service()->createRunOpendirMaybe(); - outJsonDef_.setDefaultGroup("data"); outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM); outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM); @@ -189,25 +210,31 @@ namespace evf { outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME); outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM); - std::stringstream tmpss, ss; - tmpss << baseRunDir << "/open/" - << "output_" << getpid() << ".jsd"; + std::stringstream ss; ss << baseRunDir << "/" << "output_" << getpid() << ".jsd"; - std::string outTmpJsonDefName = tmpss.str(); outJsonDefName_ = ss.str(); - edm::Service()->lockInitLock(); - struct stat fstat; - if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist - LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_; - std::string content; - jsoncollector::JSONSerializer::serialize(&outJsonDef_, content); - jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content); - std::filesystem::rename(outTmpJsonDefName, outJsonDefName_); + if (writeJsd) { + std::stringstream tmpss; + tmpss << baseRunDir << "/open/" + << "output_" << getpid() << ".jsd"; + std::string outTmpJsonDefName = tmpss.str(); + edm::Service()->createRunOpendirMaybe(); + edm::Service()->lockInitLock(); + struct stat fstat; + if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist + LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_; + std::string content; + jsoncollector::JSONSerializer::serialize(&outJsonDef_, content); + jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content); + std::filesystem::rename(outTmpJsonDefName, outJsonDefName_); + } } edm::Service()->unlockInitLock(); + } + void GlobalEvFOutputJSONDef::updateDestination(std::string const& streamLabel) { transferDestination_ = edm::Service()->getStreamDestinations(streamLabel); mergeType_ = edm::Service()->getStreamMergeType(streamLabel, evf::MergeTypeDAT); } @@ -284,6 +311,21 @@ namespace evf { << "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for " "names in FFF based HLT, but was detected in stream name"; + //output initemp file. This lets hltd know number of streams early on + if (!edm::Service().isAvailable()) + throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available"; + + const std::string iniFileName = edm::Service()->getInitTempFilePath(streamLabel_); + std::ofstream file(iniFileName); + if (!file) + throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "error: " << strerror(errno); + file.close(); + + edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName; + + //create JSD + GlobalEvFOutputJSONDef(streamLabel_, true); + fms_ = (evf::FastMonitoringService*)(edm::Service().operator->()); } @@ -305,8 +347,8 @@ namespace evf { std::shared_ptr GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const { //create run Cache holding JSON file writer and variables - auto jsonDef = std::make_unique(streamLabel_); - + auto jsonDef = std::make_unique(streamLabel_, false); + jsonDef->updateDestination(streamLabel_); edm::StreamerOutputModuleCommon streamerCommon(ps_, &keptProducts()[edm::InEvent], description().moduleLabel()); //output INI file (non-const). This doesn't require globalBeginRun to be finished @@ -341,17 +383,21 @@ namespace evf { //read back file to check integrity of what was written off_t readInput = 0; uint32_t adlera = 1, adlerb = 0; - FILE* src = fopen(openIniFileName.c_str(), "r"); + std::ifstream src(openIniFileName, std::ifstream::binary); + if (!src) + throw cms::Exception("GlobalEvFOutputModule") + << "can not read back " << openIniFileName << " error: " << strerror(errno); //allocate buffer to write INI file - std::unique_ptr outBuf = std::make_unique(1024 * 1024); + std::unique_ptr outBuf = std::make_unique(1024 * 1024); while (readInput < istat.st_size) { size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput; - fread(outBuf.get(), toRead, 1, src); - cms::Adler32(const_cast(reinterpret_cast(outBuf.get())), toRead, adlera, adlerb); + src.read(outBuf.get(), toRead); + //cms::Adler32(const_cast(reinterpret_cast(outBuf.get())), toRead, adlera, adlerb); + cms::Adler32(const_cast(outBuf.get()), toRead, adlera, adlerb); readInput += toRead; } - fclose(src); + src.close(); //clear serialization buffers streamerCommon.getSerializerBuffer()->clearHeaderBuffer(); @@ -382,7 +428,7 @@ namespace evf { edm::LuminosityBlockForOutput const& iLB) const { auto openDatFilePath = edm::Service()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_); - return std::make_shared(openDatFilePath); + return std::make_shared(openDatFilePath, iLB.luminosityBlock()); } void GlobalEvFOutputModule::acquire(edm::StreamID id, @@ -403,7 +449,7 @@ namespace evf { void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const { auto lumiWriter = luminosityBlockCache(iLB.index()); //close dat file - const_cast(lumiWriter)->close(); + const bool discarded = const_cast(lumiWriter)->close(); //auto jsonWriter = const_cast(runCache(iLB.getRun().index())); auto jsonDef = runCache(iLB.getRun().index()); @@ -417,7 +463,17 @@ namespace evf { jsonWriter.accepted_.value() = lumiWriter->getAccepted(); bool abortFlag = false; - jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag); + + if (!discarded) { + jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag); + } else { + jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag); + jsonWriter.processed_.value() = 0; + jsonWriter.accepted_.value() = 0; + edm::LogInfo("GlobalEvFOutputModule") + << "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock(); + } + if (abortFlag) { edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed"; return; diff --git a/EventFilter/Utilities/src/EvFDaqDirector.cc b/EventFilter/Utilities/src/EvFDaqDirector.cc index c5fb70f041706..13e179b19abc9 100644 --- a/EventFilter/Utilities/src/EvFDaqDirector.cc +++ b/EventFilter/Utilities/src/EvFDaqDirector.cc @@ -143,9 +143,7 @@ namespace evf { edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr); } } - } - void EvFDaqDirector::initRun() { std::stringstream ss; ss << "run" << std::setfill('0') << std::setw(6) << run_; run_string_ = ss.str(); @@ -154,10 +152,13 @@ namespace evf { run_nstring_ = ss.str(); run_dir_ = base_dir_ + "/" + run_string_; input_throttled_file_ = run_dir_ + "/input_throttle"; + discard_ls_filestem_ = run_dir_ + "/discard_ls"; ss = std::stringstream(); ss << getpid(); pid_ = ss.str(); + } + void EvFDaqDirector::initRun() { // check if base dir exists or create it accordingly int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); if (retval != 0 && errno != EEXIST) { @@ -322,6 +323,7 @@ namespace evf { nThreads_ = bounds.maxNumberOfStreams(); nStreams_ = bounds.maxNumberOfThreads(); + nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks(); } void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { @@ -446,6 +448,10 @@ namespace evf { return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream); } + std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const { + return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream); + } + std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const { return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream); @@ -2067,4 +2073,9 @@ namespace evf { return (stat(input_throttled_file_.c_str(), &buf) == 0); } + bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) { + struct stat buf; + return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0); + } + } // namespace evf diff --git a/EventFilter/Utilities/src/FedRawDataInputSource.cc b/EventFilter/Utilities/src/FedRawDataInputSource.cc index 3671ae8f30f9d..d79fb0980e2dd 100644 --- a/EventFilter/Utilities/src/FedRawDataInputSource.cc +++ b/EventFilter/Utilities/src/FedRawDataInputSource.cc @@ -847,7 +847,23 @@ void FedRawDataInputSource::readSupervisor() { while (daqDirector_->inputThrottled()) { if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) break; + + unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis(); + unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0; + unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1; + bool hasDiscardedLumi = false; + for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) { + if (daqDirector_->lumisectionDiscarded(i)) { + edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i; + hasDiscardedLumi = true; + break; + } + } + if (hasDiscardedLumi) + break; + setMonStateSup(inThrottled); + if (!(counter % 50)) edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused..."; usleep(100000); diff --git a/HLTrigger/JSONMonitoring/plugins/HLTriggerJSONMonitoring.cc b/HLTrigger/JSONMonitoring/plugins/HLTriggerJSONMonitoring.cc index f4e69e7146646..33d6a07b94c3e 100644 --- a/HLTrigger/JSONMonitoring/plugins/HLTriggerJSONMonitoring.cc +++ b/HLTrigger/JSONMonitoring/plugins/HLTriggerJSONMonitoring.cc @@ -148,7 +148,17 @@ class HLTriggerJSONMonitoring : public edm::global::EDAnalyzer< // constructor HLTriggerJSONMonitoring::HLTriggerJSONMonitoring(edm::ParameterSet const& config) : triggerResults_(config.getParameter("triggerResults")), - triggerResultsToken_(consumes(triggerResults_)) {} + triggerResultsToken_(consumes(triggerResults_)) { + if (edm::Service().isAvailable()) { + //output initemp file. This lets hltd know number of streams early + std::string initFileName = edm::Service()->getInitTempFilePath("streamHLTRates"); + std::ofstream file(initFileName); + if (!file) + throw cms::Exception("HLTriggerJsonMonitoring") + << "Cannot create INITEMP file: " << initFileName << " error: " << strerror(errno); + file.close(); + } +} // validate the configuration and optionally fill the default values void HLTriggerJSONMonitoring::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { diff --git a/HLTrigger/JSONMonitoring/plugins/L1TriggerJSONMonitoring.cc b/HLTrigger/JSONMonitoring/plugins/L1TriggerJSONMonitoring.cc index 3ca71bd4a7522..572d22dab0049 100644 --- a/HLTrigger/JSONMonitoring/plugins/L1TriggerJSONMonitoring.cc +++ b/HLTrigger/JSONMonitoring/plugins/L1TriggerJSONMonitoring.cc @@ -161,7 +161,17 @@ constexpr const std::array L1TriggerJSONMonitoring::tcdsTrigger L1TriggerJSONMonitoring::L1TriggerJSONMonitoring(edm::ParameterSet const& config) : level1Results_(config.getParameter("L1Results")), level1ResultsToken_(consumes(level1Results_)), - l1tUtmTriggerMenuRcdToken_(esConsumes()) {} + l1tUtmTriggerMenuRcdToken_(esConsumes()) { + if (edm::Service().isAvailable()) { + //output initemp file. This lets hltd know number of streams early + std::string initFileName = edm::Service()->getInitTempFilePath("streamL1Rates"); + std::ofstream file(initFileName); + if (!file) + throw cms::Exception("L1TriggerJsonMonitoring") + << "Cannot create INITEMP file: " << initFileName << " error: " << strerror(errno); + file.close(); + } +} // validate the configuration and optionally fill the default values void L1TriggerJSONMonitoring::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {