Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[11_1_X] Protect storage accounting UDP messages from NaN, and Use StatisticsSenderService for all framework files #36357

Merged
2 changes: 1 addition & 1 deletion IOPool/Input/src/EmbeddedRootSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace edm {
InputFile::reportReadBranches();
}

void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile_(); }
void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile(); }

bool EmbeddedRootSource::readOneEvent(EventPrincipal& cache,
size_t& fileNameHash,
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/PoolSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ namespace edm {
return fb;
}

void PoolSource::closeFile_() { primaryFileSequence_->closeFile_(); }
void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }

std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }

Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootEmbeddedFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ namespace edm {

RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {}

void RootEmbeddedFileSequence::endJob() { closeFile_(); }
void RootEmbeddedFileSequence::endJob() { closeFile(); }

void RootEmbeddedFileSequence::closeFile_() {
// delete the RootFile object.
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootEmbeddedFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ namespace edm {
RootEmbeddedFileSequence(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving
RootEmbeddedFileSequence& operator=(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving

void closeFile_() override;
void endJob();
void skipEntries(unsigned int offset);
bool readOneEvent(
Expand All @@ -56,6 +55,7 @@ namespace edm {
static void fillDescription(ParameterSetDescription& desc);

private:
void closeFile_() override;
void initFile_(bool skipBadFiles) override;
RootFileSharedPtr makeRootFile(std::shared_ptr<InputFile> filePtr) override;

Expand Down
63 changes: 40 additions & 23 deletions IOPool/Input/src/RootInputFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "Utilities/StorageFactory/interface/StorageFactory.h"
#include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
#include "FWCore/ServiceRegistry/interface/Service.h"

#include "TSystem.h"

Expand Down Expand Up @@ -193,7 +195,7 @@ namespace edm {
}
fileIterLastOpened_ = fileIterEnd_;
}
closeFile_();
closeFile();

if (noMoreFiles()) {
// No files specified
Expand All @@ -217,6 +219,7 @@ namespace edm {

lfn_ = logicalFileName().empty() ? fileNames()[0] : logicalFileName();
lfnHash_ = std::hash<std::string>()(lfn_);
usedFallback_ = false;

std::shared_ptr<InputFile> filePtr;
std::list<std::string> originalInfo;
Expand All @@ -225,31 +228,37 @@ namespace edm {

//this tries to open the file using multiple PFNs corresponding to different data catalogs
std::list<std::string> exInfo;
for (std::vector<std::string>::const_iterator it = fNames.begin(); it != fNames.end(); ++it) {
try {
std::unique_ptr<InputSource::FileOpenSentry> sentry(
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, false) : nullptr);
std::unique_ptr<char[]> name(gSystem->ExpandPathName(it->c_str()));
filePtr = std::make_shared<InputFile>(name.get(), " Initiating request to open file ", inputType);
break;
} catch (cms::Exception const& e) {
if (!skipBadFiles && std::next(it) == fNames.end()) {
InputFile::reportSkippedFile((*it), logicalFileName());
Exception ex(errors::FileOpenError, "", e);
ex.addContext("Calling RootInputFileSequence::initTheFile()");
std::ostringstream out;
out << "Input file " << (*it) << " could not be opened.";
ex.addAdditionalInfo(out.str());
//report previous exceptions when use other names to open file
for (auto const& s : exInfo)
ex.addAdditionalInfo(s);
throw ex;
} else {
exInfo.push_back("Calling RootInputFileSequence::initTheFile(): fail to open the file with name " + (*it));
{
std::unique_ptr<InputSource::FileOpenSentry> sentry(
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, false) : nullptr);
edm::Service<edm::storage::StatisticsSenderService> service;
if (service.isAvailable()) {
service->openingFile(lfn(), inputType, -1);
}
for (std::vector<std::string>::const_iterator it = fNames.begin(); it != fNames.end(); ++it) {
try {
std::unique_ptr<char[]> name(gSystem->ExpandPathName(it->c_str()));
filePtr = std::make_shared<InputFile>(name.get(), " Initiating request to open file ", inputType);
usedFallback_ = (it != fNames.begin());
break;
} catch (cms::Exception const& e) {
if (!skipBadFiles && std::next(it) == fNames.end()) {
InputFile::reportSkippedFile((*it), logicalFileName());
Exception ex(errors::FileOpenError, "", e);
ex.addContext("Calling RootInputFileSequence::initTheFile()");
std::ostringstream out;
out << "Input file " << (*it) << " could not be opened.";
ex.addAdditionalInfo(out.str());
//report previous exceptions when use other names to open file
for (auto const& s : exInfo)
ex.addAdditionalInfo(s);
throw ex;
} else {
exInfo.push_back("Calling RootInputFileSequence::initTheFile(): fail to open the file with name " + (*it));
}
}
}
}

if (filePtr) {
size_t currentIndexIntoFile = fileIter_ - fileIterBegin_;
rootFile_ = makeRootFile(filePtr);
Expand All @@ -272,6 +281,14 @@ namespace edm {
}
}

void RootInputFileSequence::closeFile() {
edm::Service<edm::storage::StatisticsSenderService> service;
if (rootFile() and service.isAvailable()) {
service->closedFile(lfn(), usedFallback());
}
closeFile_();
}

void RootInputFileSequence::setIndexIntoFile(size_t index) {
indexesIntoFiles_[index] = rootFile()->indexIntoFileSharedPtr();
}
Expand Down
2 changes: 2 additions & 0 deletions IOPool/Input/src/RootInputFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace edm {
std::shared_ptr<ProductRegistry const> fileProductRegistry() const;
std::shared_ptr<BranchIDListHelper const> fileBranchIDListHelper() const;

void closeFile();

protected:
typedef std::shared_ptr<RootFile> RootFileSharedPtr;
void initFile(bool skipBadFiles) { initFile_(skipBadFiles); }
Expand Down
4 changes: 2 additions & 2 deletions IOPool/Input/src/RootPrimaryFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace edm {

RootPrimaryFileSequence::~RootPrimaryFileSequence() {}

void RootPrimaryFileSequence::endJob() { closeFile_(); }
void RootPrimaryFileSequence::endJob() { closeFile(); }

std::unique_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
if (firstFile_) {
Expand Down Expand Up @@ -215,7 +215,7 @@ namespace edm {
// Rewind to before the first event that was read.
void RootPrimaryFileSequence::rewind_() {
if (!atFirstFile()) {
closeFile_();
closeFile();
setAtFirstFile();
}
if (!rootFile()) {
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootPrimaryFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ namespace edm {
RootPrimaryFileSequence& operator=(RootPrimaryFileSequence const&) = delete; // Disallow copying and moving

std::unique_ptr<FileBlock> readFile_();
void closeFile_() override;
void endJob();
InputSource::ItemType getNextItemType(RunNumber_t& run, LuminosityBlockNumber_t& lumi, EventNumber_t& event);
bool skipEvents(int offset);
Expand All @@ -56,6 +55,7 @@ namespace edm {
bool nextFile();
bool previousFile();
void rewindFile();
void closeFile_() override;

int remainingEvents() const;
int remainingLuminosityBlocks() const;
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootSecondaryFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ namespace edm {

RootSecondaryFileSequence::~RootSecondaryFileSequence() {}

void RootSecondaryFileSequence::endJob() { closeFile_(); }
void RootSecondaryFileSequence::endJob() { closeFile(); }

void RootSecondaryFileSequence::closeFile_() {
// close the currently open file, if any, and delete the RootFile object.
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootSecondaryFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ namespace edm {
RootSecondaryFileSequence(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving
RootSecondaryFileSequence& operator=(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving

void closeFile_() override;
void endJob();
void initAssociationsFromSecondary(std::set<BranchID> const&);

private:
void closeFile_() override;
void initFile_(bool skipBadFiles) override;
RootFileSharedPtr makeRootFile(std::shared_ptr<InputFile> filePtr) override;

Expand Down
4 changes: 2 additions & 2 deletions IOPool/SecondaryInput/test/SecondaryProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
************************************************************/

#include "DataFormats/Provenance/interface/EventID.h"
#include "FWCore/Framework/interface/EDProducer.h"
#include "FWCore/Framework/interface/one/EDProducer.h"
#include "FWCore/Utilities/interface/get_underlying_safe.h"

#include <memory>
Expand All @@ -19,7 +19,7 @@ namespace edm {
class ProcessConfiguration;
class VectorInputSource;

class SecondaryProducer : public EDProducer {
class SecondaryProducer : public one::EDProducer<> {
public:
/** standard constructor*/
explicit SecondaryProducer(ParameterSet const& pset);
Expand Down
2 changes: 1 addition & 1 deletion IOPool/TFileAdaptor/src/TStorageFactoryFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = ""
try {
edm::Service<edm::storage::StatisticsSenderService> statsService;
if (statsService.isAvailable()) {
statsService->setSize(storage_->size());
statsService->setSize(path, storage_->size());
}
} catch (edm::Exception const &e) {
if (e.categoryCode() != edm::errors::NotFound) {
Expand Down
55 changes: 42 additions & 13 deletions Utilities/StorageFactory/interface/StatisticsSenderService.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <sstream>
#include <atomic>
#include <mutex>
#include <tbb/concurrent_unordered_map.h>
#include "FWCore/Utilities/interface/InputType.h"

namespace edm {

Expand All @@ -16,19 +18,25 @@ namespace edm {

class StatisticsSenderService {
public:
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar);
StatisticsSenderService(edm::ParameterSet const& pset, edm::ActivityRegistry& ar);

void setSize(size_t size);
void setCurrentServer(const std::string &servername);
void filePreCloseEvent(std::string const &lfn, bool usedFallback);
static const char *getJobID();
static bool getX509Subject(std::string &);
void setSize(const std::string& urlOrLfn, size_t size);
void setCurrentServer(const std::string& urlOrLfn, const std::string& servername);
static const char* getJobID();
static bool getX509Subject(std::string&);

void openingFile(std::string const& lfn, edm::InputType type, size_t size = -1);
void closedFile(std::string const& lfn, bool usedFallback);

private:
void filePostCloseEvent(std::string const& lfn, bool usedFallback);

std::string const* matchedLfn(std::string const& iURL); //updates its internal cache
class FileStatistics {
public:
FileStatistics();
void fillUDP(std::ostringstream &os);
void fillUDP(std::ostringstream& os) const;
void update();

private:
ssize_t m_read_single_operations;
Expand All @@ -42,19 +50,40 @@ namespace edm {
time_t m_start_time;
};

void determineHostnames(void);
void fillUDP(const std::string &, bool, std::string &);
struct FileInfo {
explicit FileInfo(std::string const& iLFN, edm::InputType);

FileInfo(FileInfo&& iInfo)
: m_filelfn(std::move(iInfo.m_filelfn)),
m_serverhost(std::move(iInfo.m_serverhost)),
m_serverdomain(std::move(iInfo.m_serverdomain)),
m_type(iInfo.m_type),
m_size(iInfo.m_size.load()),
m_id(iInfo.m_id),
m_openCount(iInfo.m_openCount.load()) {}
std::string m_filelfn;
std::string m_serverhost;
std::string m_serverdomain;
edm::InputType m_type;
std::atomic<ssize_t> m_size;
size_t m_id; //from m_counter
std::atomic<int> m_openCount;
};

void determineHostnames();
void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&) const;
void cleanupOldFiles();

std::string m_clienthost;
std::string m_clientdomain;
std::string m_serverhost;
std::string m_serverdomain;
std::string m_filelfn;
tbb::concurrent_unordered_map<std::string, FileInfo> m_lfnToFileInfo;
tbb::concurrent_unordered_map<std::string, std::string> m_urlToLfn;
FileStatistics m_filestats;
std::string m_guid;
size_t m_counter;
std::atomic<ssize_t> m_size;
std::string m_userdn;
std::mutex m_servermutex;
const bool m_debug;
};

} // namespace storage
Expand Down
Loading