Skip to content

Commit

Permalink
Merge pull request #36358 from makortel/backportStatisticsSenderServi…
Browse files Browse the repository at this point in the history
…ce_110x

[11_0_X] Protect storage accounting UDP messages from NaN, and Use StatisticsSenderService for all framework files
  • Loading branch information
cmsbuild authored Dec 10, 2021
2 parents 6bc0e05 + ed0609f commit 085def3
Show file tree
Hide file tree
Showing 23 changed files with 489 additions and 136 deletions.
2 changes: 1 addition & 1 deletion IOPool/Input/src/EmbeddedRootSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace edm {
InputFile::reportReadBranches();
}

void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile_(); }
void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile(); }

bool EmbeddedRootSource::readOneEvent(EventPrincipal& cache,
size_t& fileNameHash,
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/PoolSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ namespace edm {
return fb;
}

void PoolSource::closeFile_() { primaryFileSequence_->closeFile_(); }
void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }

std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }

Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootEmbeddedFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ namespace edm {

RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {}

void RootEmbeddedFileSequence::endJob() { closeFile_(); }
void RootEmbeddedFileSequence::endJob() { closeFile(); }

void RootEmbeddedFileSequence::closeFile_() {
// delete the RootFile object.
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootEmbeddedFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ namespace edm {
RootEmbeddedFileSequence(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving
RootEmbeddedFileSequence& operator=(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving

void closeFile_() override;
void endJob();
void skipEntries(unsigned int offset);
bool readOneEvent(
Expand All @@ -56,6 +55,7 @@ namespace edm {
static void fillDescription(ParameterSetDescription& desc);

private:
void closeFile_() override;
void initFile_(bool skipBadFiles) override;
RootFileSharedPtr makeRootFile(std::shared_ptr<InputFile> filePtr) override;

Expand Down
103 changes: 58 additions & 45 deletions IOPool/Input/src/RootInputFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "Utilities/StorageFactory/interface/StorageFactory.h"
#include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
#include "FWCore/ServiceRegistry/interface/Service.h"

#include "TSystem.h"

Expand Down Expand Up @@ -192,7 +194,7 @@ namespace edm {
}
fileIterLastOpened_ = fileIterEnd_;
}
closeFile_();
closeFile();

if (noMoreFiles()) {
// No files specified
Expand Down Expand Up @@ -224,58 +226,61 @@ namespace edm {

std::shared_ptr<InputFile> filePtr;
std::list<std::string> originalInfo;
try {
{
std::unique_ptr<InputSource::FileOpenSentry> sentry(
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, usedFallback_) : nullptr);
std::unique_ptr<char[]> name(gSystem->ExpandPathName(fileName().c_str()));
;
filePtr = std::make_shared<InputFile>(name.get(), " Initiating request to open file ", inputType);
} catch (cms::Exception const& e) {
if (!skipBadFiles) {
if (hasFallbackUrl) {
std::ostringstream out;
out << e.explainSelf();

std::unique_ptr<char[]> name(gSystem->ExpandPathName(fallbackFileName().c_str()));
std::string pfn(name.get());
InputFile::reportFallbackAttempt(pfn, logicalFileName(), out.str());
originalInfo = e.additionalInfo();
} else {
InputFile::reportSkippedFile(fileName(), logicalFileName());
Exception ex(errors::FileOpenError, "", e);
ex.addContext("Calling RootFileSequenceBase::initTheFile()");
std::ostringstream out;
out << "Input file " << fileName() << " could not be opened.";
ex.addAdditionalInfo(out.str());
throw ex;
}
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, false) : nullptr);
edm::Service<edm::storage::StatisticsSenderService> service;
if (service.isAvailable()) {
service->openingFile(lfn(), inputType, -1);
}
}
if (!filePtr && (hasFallbackUrl)) {
try {
usedFallback_ = true;
std::unique_ptr<InputSource::FileOpenSentry> sentry(
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, usedFallback_) : nullptr);
std::unique_ptr<char[]> fallbackFullName(gSystem->ExpandPathName(fallbackFileName().c_str()));
filePtr.reset(new InputFile(fallbackFullName.get(), " Fallback request to file ", inputType));
std::unique_ptr<char[]> name(gSystem->ExpandPathName(fileName().c_str()));
filePtr = std::make_shared<InputFile>(name.get(), " Initiating request to open file ", inputType);
} catch (cms::Exception const& e) {
if (!skipBadFiles) {
InputFile::reportSkippedFile(fileName(), logicalFileName());
Exception ex(errors::FallbackFileOpenError, "", e);
ex.addContext("Calling RootFileSequenceBase::initTheFile()");
std::ostringstream out;
out << "Input file " << fileName() << " could not be opened.\n";
out << "Fallback Input file " << fallbackFileName() << " also could not be opened.";
if (!originalInfo.empty()) {
out << std::endl << "Original exception info is above; fallback exception info is below.";
ex.addAdditionalInfo(out.str());
for (auto const& s : originalInfo) {
ex.addAdditionalInfo(s);
}
if (hasFallbackUrl) {
std::ostringstream out;
out << e.explainSelf();

std::unique_ptr<char[]> name(gSystem->ExpandPathName(fallbackFileName().c_str()));
std::string pfn(name.get());
InputFile::reportFallbackAttempt(pfn, logicalFileName(), out.str());
originalInfo = e.additionalInfo();
} else {
InputFile::reportSkippedFile(fileName(), logicalFileName());
Exception ex(errors::FileOpenError, "", e);
ex.addContext("Calling RootFileSequenceBase::initTheFile()");
std::ostringstream out;
out << "Input file " << fileName() << " could not be opened.";
ex.addAdditionalInfo(out.str());
throw ex;
}
}
}
if (!filePtr && (hasFallbackUrl)) {
try {
usedFallback_ = true;
std::unique_ptr<char[]> fallbackFullName(gSystem->ExpandPathName(fallbackFileName().c_str()));
filePtr.reset(new InputFile(fallbackFullName.get(), " Fallback request to file ", inputType));
} catch (cms::Exception const& e) {
if (!skipBadFiles) {
InputFile::reportSkippedFile(fileName(), logicalFileName());
Exception ex(errors::FallbackFileOpenError, "", e);
ex.addContext("Calling RootFileSequenceBase::initTheFile()");
std::ostringstream out;
out << "Input file " << fileName() << " could not be opened.\n";
out << "Fallback Input file " << fallbackFileName() << " also could not be opened.";
if (!originalInfo.empty()) {
out << std::endl << "Original exception info is above; fallback exception info is below.";
ex.addAdditionalInfo(out.str());
for (auto const& s : originalInfo) {
ex.addAdditionalInfo(s);
}
} else {
ex.addAdditionalInfo(out.str());
}
throw ex;
}
throw ex;
}
}
}
Expand All @@ -299,6 +304,14 @@ namespace edm {
}
}

void RootInputFileSequence::closeFile() {
edm::Service<edm::storage::StatisticsSenderService> service;
if (rootFile() and service.isAvailable()) {
service->closedFile(lfn(), usedFallback());
}
closeFile_();
}

void RootInputFileSequence::setIndexIntoFile(size_t index) {
indexesIntoFiles_[index] = rootFile()->indexIntoFileSharedPtr();
}
Expand Down
2 changes: 2 additions & 0 deletions IOPool/Input/src/RootInputFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace edm {
std::shared_ptr<ProductRegistry const> fileProductRegistry() const;
std::shared_ptr<BranchIDListHelper const> fileBranchIDListHelper() const;

void closeFile();

protected:
typedef std::shared_ptr<RootFile> RootFileSharedPtr;
void initFile(bool skipBadFiles) { initFile_(skipBadFiles); }
Expand Down
4 changes: 2 additions & 2 deletions IOPool/Input/src/RootPrimaryFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ namespace edm {

RootPrimaryFileSequence::~RootPrimaryFileSequence() {}

void RootPrimaryFileSequence::endJob() { closeFile_(); }
void RootPrimaryFileSequence::endJob() { closeFile(); }

std::unique_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
if (firstFile_) {
Expand Down Expand Up @@ -212,7 +212,7 @@ namespace edm {
// Rewind to before the first event that was read.
void RootPrimaryFileSequence::rewind_() {
if (!atFirstFile()) {
closeFile_();
closeFile();
setAtFirstFile();
}
if (!rootFile()) {
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootPrimaryFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ namespace edm {
RootPrimaryFileSequence& operator=(RootPrimaryFileSequence const&) = delete; // Disallow copying and moving

std::unique_ptr<FileBlock> readFile_();
void closeFile_() override;
void endJob();
InputSource::ItemType getNextItemType(RunNumber_t& run, LuminosityBlockNumber_t& lumi, EventNumber_t& event);
bool skipEvents(int offset);
Expand All @@ -56,6 +55,7 @@ namespace edm {
bool nextFile();
bool previousFile();
void rewindFile();
void closeFile_() override;

int remainingEvents() const;
int remainingLuminosityBlocks() const;
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootSecondaryFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ namespace edm {

RootSecondaryFileSequence::~RootSecondaryFileSequence() {}

void RootSecondaryFileSequence::endJob() { closeFile_(); }
void RootSecondaryFileSequence::endJob() { closeFile(); }

void RootSecondaryFileSequence::closeFile_() {
// close the currently open file, if any, and delete the RootFile object.
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootSecondaryFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ namespace edm {
RootSecondaryFileSequence(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving
RootSecondaryFileSequence& operator=(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving

void closeFile_() override;
void endJob();
void initAssociationsFromSecondary(std::set<BranchID> const&);

private:
void closeFile_() override;
void initFile_(bool skipBadFiles) override;
RootFileSharedPtr makeRootFile(std::shared_ptr<InputFile> filePtr) override;

Expand Down
4 changes: 2 additions & 2 deletions IOPool/SecondaryInput/test/SecondaryProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
************************************************************/

#include "DataFormats/Provenance/interface/EventID.h"
#include "FWCore/Framework/interface/EDProducer.h"
#include "FWCore/Framework/interface/one/EDProducer.h"
#include "FWCore/Utilities/interface/get_underlying_safe.h"

#include <memory>
Expand All @@ -19,7 +19,7 @@ namespace edm {
class ProcessConfiguration;
class VectorInputSource;

class SecondaryProducer : public EDProducer {
class SecondaryProducer : public one::EDProducer<> {
public:
/** standard constructor*/
explicit SecondaryProducer(ParameterSet const& pset);
Expand Down
2 changes: 1 addition & 1 deletion IOPool/TFileAdaptor/src/TStorageFactoryFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = ""
try {
edm::Service<edm::storage::StatisticsSenderService> statsService;
if (statsService.isAvailable()) {
statsService->setSize(storage_->size());
statsService->setSize(path, storage_->size());
}
} catch (edm::Exception const &e) {
if (e.categoryCode() != edm::errors::NotFound) {
Expand Down
55 changes: 42 additions & 13 deletions Utilities/StorageFactory/interface/StatisticsSenderService.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <sstream>
#include <atomic>
#include <mutex>
#include <tbb/concurrent_unordered_map.h>
#include "FWCore/Utilities/interface/InputType.h"

namespace edm {

Expand All @@ -16,19 +18,25 @@ namespace edm {

class StatisticsSenderService {
public:
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar);
StatisticsSenderService(edm::ParameterSet const& pset, edm::ActivityRegistry& ar);

void setSize(size_t size);
void setCurrentServer(const std::string &servername);
void filePreCloseEvent(std::string const &lfn, bool usedFallback);
static const char *getJobID();
static bool getX509Subject(std::string &);
void setSize(const std::string& urlOrLfn, size_t size);
void setCurrentServer(const std::string& urlOrLfn, const std::string& servername);
static const char* getJobID();
static bool getX509Subject(std::string&);

void openingFile(std::string const& lfn, edm::InputType type, size_t size = -1);
void closedFile(std::string const& lfn, bool usedFallback);

private:
void filePostCloseEvent(std::string const& lfn, bool usedFallback);

std::string const* matchedLfn(std::string const& iURL); //updates its internal cache
class FileStatistics {
public:
FileStatistics();
void fillUDP(std::ostringstream &os);
void fillUDP(std::ostringstream& os) const;
void update();

private:
ssize_t m_read_single_operations;
Expand All @@ -42,19 +50,40 @@ namespace edm {
time_t m_start_time;
};

void determineHostnames(void);
void fillUDP(const std::string &, bool, std::string &);
struct FileInfo {
explicit FileInfo(std::string const& iLFN, edm::InputType);

FileInfo(FileInfo&& iInfo)
: m_filelfn(std::move(iInfo.m_filelfn)),
m_serverhost(std::move(iInfo.m_serverhost)),
m_serverdomain(std::move(iInfo.m_serverdomain)),
m_type(iInfo.m_type),
m_size(iInfo.m_size.load()),
m_id(iInfo.m_id),
m_openCount(iInfo.m_openCount.load()) {}
std::string m_filelfn;
std::string m_serverhost;
std::string m_serverdomain;
edm::InputType m_type;
std::atomic<ssize_t> m_size;
size_t m_id; //from m_counter
std::atomic<int> m_openCount;
};

void determineHostnames();
void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&) const;
void cleanupOldFiles();

std::string m_clienthost;
std::string m_clientdomain;
std::string m_serverhost;
std::string m_serverdomain;
std::string m_filelfn;
tbb::concurrent_unordered_map<std::string, FileInfo> m_lfnToFileInfo;
tbb::concurrent_unordered_map<std::string, std::string> m_urlToLfn;
FileStatistics m_filestats;
std::string m_guid;
size_t m_counter;
std::atomic<ssize_t> m_size;
std::string m_userdn;
std::mutex m_servermutex;
const bool m_debug;
};

} // namespace storage
Expand Down
Loading

0 comments on commit 085def3

Please sign in to comment.