Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(DAQ) File based protocol update: "initemp" markers + "discardLS" feature [12_6_X] #40156

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions DQMServices/FileIO/plugins/DQMFileSaverPB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ DQMFileSaverPB::DQMFileSaverPB(const edm::ParameterSet& ps) : DQMFileSaverBase(p
if (tag_ != "UNKNOWN") {
streamLabel_ = "DQMLive";
}

if (!fakeFilterUnitMode_) {
if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
throw cms::Exception("DQMFileSaverPB") << "EvFDaqDirector is not available";
std::string initFileName = edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_);
std::ofstream file(initFileName);
if (!file)
throw cms::Exception("DQMFileSaverPB")
<< "Cannot create INI file: " << initFileName << " error: " << strerror(errno);
file.close();
}
}

DQMFileSaverPB::~DQMFileSaverPB() = default;
Expand All @@ -52,13 +63,6 @@ void DQMFileSaverPB::initRun() const {
transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel_);
mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel_, evf::MergeTypePB);
}

if (!fakeFilterUnitMode_) {
evf::EvFDaqDirector* daqDirector = (evf::EvFDaqDirector*)(edm::Service<evf::EvFDaqDirector>().operator->());
const std::string initFileName = daqDirector->getInitFilePath(streamLabel_);
std::ofstream file(initFileName);
file.close();
}
}

void DQMFileSaverPB::saveLumi(const FileParameters& fp) const {
Expand Down
5 changes: 5 additions & 0 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ namespace evf {
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
std::string getOpenInitFilePath(std::string const& stream) const;
std::string getInitFilePath(std::string const& stream) const;
std::string getInitTempFilePath(std::string const& stream) const;
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
Expand Down Expand Up @@ -120,6 +121,7 @@ namespace evf {
void unlockInitLock();
void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
void lockFULocal();
void unlockFULocal();
void lockFULocal2();
Expand Down Expand Up @@ -185,6 +187,7 @@ namespace evf {
std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
bool inputThrottled();
bool lumisectionDiscarded(unsigned int ls);

private:
bool bumpFile(unsigned int& ls,
Expand Down Expand Up @@ -263,6 +266,7 @@ namespace evf {

unsigned int nStreams_ = 0;
unsigned int nThreads_ = 0;
unsigned int nConcurrentLumis_ = 0;

bool readEolsDefinition_ = true;
unsigned int eolsNFilesIndex_ = 1;
Expand All @@ -286,6 +290,7 @@ namespace evf {
std::unique_ptr<boost::asio::ip::tcp::socket> socket_;

std::string input_throttled_file_;
std::string discard_ls_filestem_;
};
} // namespace evf

Expand Down
7 changes: 7 additions & 0 deletions EventFilter/Utilities/interface/FFFNamingSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ namespace fffnaming {
return ss.str();
}

inline std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const& stream) {
std::stringstream ss;
runLumiPrefixFill(ss, run, ls);
ss << "_" << stream << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".initemp";
return ss.str();
}

inline std::string initFileNameWithInstance(const unsigned int run,
const unsigned int ls,
std::string const& stream,
Expand Down
116 changes: 86 additions & 30 deletions EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ namespace evf {

class GlobalEvFOutputEventWriter {
public:
explicit GlobalEvFOutputEventWriter(std::string const& filePath)
: filePath_(filePath), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}
explicit GlobalEvFOutputEventWriter(std::string const& filePath, unsigned int ls)
: filePath_(filePath), ls_(ls), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}

~GlobalEvFOutputEventWriter() {}

void close() { stream_writer_events_->close(); }
bool close() {
stream_writer_events_->close();
return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
}

void doOutputEvent(EventMsgBuilder const& msg) {
EventMsgView eview(msg.startAddress());
Expand All @@ -58,6 +61,12 @@ namespace evf {

void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
throttledCheck();
discardedCheck();
if (discarded_) {
incAccepted();
msg.reset();
return;
}
auto group = iHolder.group();
writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
try {
Expand All @@ -72,13 +81,24 @@ namespace evf {

inline void throttledCheck() {
unsigned int counter = 0;
while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
while (edm::Service<evf::EvFDaqDirector>()->inputThrottled() && !discarded_) {
if (edm::shutdown_flag.load(std::memory_order_relaxed))
break;
if (!(counter % 100))
edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
usleep(100000);
counter++;
if (edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
discarded_ = true;
}
}
}

inline void discardedCheck() {
if (!discarded_ && edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
discarded_ = true;
}
}

Expand All @@ -93,14 +113,17 @@ namespace evf {

private:
std::string filePath_;
const unsigned ls_;
std::atomic<unsigned long> accepted_;
edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
edm::SerialTaskQueue writeQueue_;
bool discarded_ = false;
};

class GlobalEvFOutputJSONDef {
public:
GlobalEvFOutputJSONDef(std::string const& streamLabel);
GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd);
void updateDestination(std::string const& streamLabel);

jsoncollector::DataPointDefinition outJsonDef_;
std::string outJsonDefName_;
Expand Down Expand Up @@ -170,12 +193,10 @@ namespace evf {

}; //end-of-class-def

GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel) {
GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) {
std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;

edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();

outJsonDef_.setDefaultGroup("data");
outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
Expand All @@ -189,25 +210,31 @@ namespace evf {
outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);

std::stringstream tmpss, ss;
tmpss << baseRunDir << "/open/"
<< "output_" << getpid() << ".jsd";
std::stringstream ss;
ss << baseRunDir << "/"
<< "output_" << getpid() << ".jsd";
std::string outTmpJsonDefName = tmpss.str();
outJsonDefName_ = ss.str();

edm::Service<evf::EvFDaqDirector>()->lockInitLock();
struct stat fstat;
if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist
LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
std::string content;
jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
if (writeJsd) {
std::stringstream tmpss;
tmpss << baseRunDir << "/open/"
<< "output_" << getpid() << ".jsd";
std::string outTmpJsonDefName = tmpss.str();
edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
edm::Service<evf::EvFDaqDirector>()->lockInitLock();
struct stat fstat;
if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist
LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
std::string content;
jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
}
}
edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
}

void GlobalEvFOutputJSONDef::updateDestination(std::string const& streamLabel) {
transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
}
Expand Down Expand Up @@ -284,6 +311,21 @@ namespace evf {
<< "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
"names in FFF based HLT, but was detected in stream name";

//output initemp file. This lets hltd know number of streams early on
if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available";

const std::string iniFileName = edm::Service<evf::EvFDaqDirector>()->getInitTempFilePath(streamLabel_);
std::ofstream file(iniFileName);
if (!file)
throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "error: " << strerror(errno);
file.close();

edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName;

//create JSD
GlobalEvFOutputJSONDef(streamLabel_, true);

fms_ = (evf::FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
}

Expand All @@ -305,8 +347,8 @@ namespace evf {

std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
//create run Cache holding JSON file writer and variables
auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_);

auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
jsonDef->updateDestination(streamLabel_);
edm::StreamerOutputModuleCommon streamerCommon(ps_, &keptProducts()[edm::InEvent], description().moduleLabel());

//output INI file (non-const). This doesn't require globalBeginRun to be finished
Expand Down Expand Up @@ -341,17 +383,21 @@ namespace evf {
//read back file to check integrity of what was written
off_t readInput = 0;
uint32_t adlera = 1, adlerb = 0;
FILE* src = fopen(openIniFileName.c_str(), "r");
std::ifstream src(openIniFileName, std::ifstream::binary);
if (!src)
throw cms::Exception("GlobalEvFOutputModule")
<< "can not read back " << openIniFileName << " error: " << strerror(errno);

//allocate buffer to write INI file
std::unique_ptr<unsigned char[]> outBuf = std::make_unique<unsigned char[]>(1024 * 1024);
std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
while (readInput < istat.st_size) {
size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
fread(outBuf.get(), toRead, 1, src);
cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
src.read(outBuf.get(), toRead);
//cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
readInput += toRead;
}
fclose(src);
src.close();

//clear serialization buffers
streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
Expand Down Expand Up @@ -382,7 +428,7 @@ namespace evf {
edm::LuminosityBlockForOutput const& iLB) const {
auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);

return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath);
return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock());
}

void GlobalEvFOutputModule::acquire(edm::StreamID id,
Expand All @@ -403,7 +449,7 @@ namespace evf {
void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
auto lumiWriter = luminosityBlockCache(iLB.index());
//close dat file
const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
const bool discarded = const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();

//auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
auto jsonDef = runCache(iLB.getRun().index());
Expand All @@ -417,7 +463,17 @@ namespace evf {
jsonWriter.accepted_.value() = lumiWriter->getAccepted();

bool abortFlag = false;
jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);

if (!discarded) {
jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
} else {
jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
jsonWriter.processed_.value() = 0;
jsonWriter.accepted_.value() = 0;
edm::LogInfo("GlobalEvFOutputModule")
<< "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock();
}

if (abortFlag) {
edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
return;
Expand Down
15 changes: 13 additions & 2 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ namespace evf {
edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
}
}
}

void EvFDaqDirector::initRun() {
std::stringstream ss;
ss << "run" << std::setfill('0') << std::setw(6) << run_;
run_string_ = ss.str();
Expand All @@ -154,10 +152,13 @@ namespace evf {
run_nstring_ = ss.str();
run_dir_ = base_dir_ + "/" + run_string_;
input_throttled_file_ = run_dir_ + "/input_throttle";
discard_ls_filestem_ = run_dir_ + "/discard_ls";
ss = std::stringstream();
ss << getpid();
pid_ = ss.str();
}

void EvFDaqDirector::initRun() {
// check if base dir exists or create it accordingly
int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
if (retval != 0 && errno != EEXIST) {
Expand Down Expand Up @@ -322,6 +323,7 @@ namespace evf {

nThreads_ = bounds.maxNumberOfStreams();
nStreams_ = bounds.maxNumberOfThreads();
nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
}

void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
Expand Down Expand Up @@ -446,6 +448,10 @@ namespace evf {
return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
}

std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
}

std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
std::string const& stream) const {
return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
Expand Down Expand Up @@ -2067,4 +2073,9 @@ namespace evf {
return (stat(input_throttled_file_.c_str(), &buf) == 0);
}

bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
struct stat buf;
return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
}

} // namespace evf
16 changes: 16 additions & 0 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,23 @@ void FedRawDataInputSource::readSupervisor() {
while (daqDirector_->inputThrottled()) {
if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
break;

unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
bool hasDiscardedLumi = false;
for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
if (daqDirector_->lumisectionDiscarded(i)) {
edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
hasDiscardedLumi = true;
break;
}
}
if (hasDiscardedLumi)
break;

setMonStateSup(inThrottled);

if (!(counter % 50))
edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
usleep(100000);
Expand Down
Loading