Skip to content

Commit

Permalink
Merge pull request #35505 from Dr15Jones/updateStatisticsSenderService
Browse files Browse the repository at this point in the history
Use StatisticsSenderService for all framework files
  • Loading branch information
cmsbuild authored Oct 23, 2021
2 parents ea43e07 + 56bde9a commit 7356243
Show file tree
Hide file tree
Showing 22 changed files with 455 additions and 102 deletions.
2 changes: 1 addition & 1 deletion IOPool/Input/src/EmbeddedRootSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace edm {
InputFile::reportReadBranches();
}

void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile_(); }
void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile(); }

bool EmbeddedRootSource::readOneEvent(EventPrincipal& cache,
size_t& fileNameHash,
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/PoolSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ namespace edm {
return fb;
}

void PoolSource::closeFile_() { primaryFileSequence_->closeFile_(); }
void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }

std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }

Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootEmbeddedFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ namespace edm {

RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {}

void RootEmbeddedFileSequence::endJob() { closeFile_(); }
void RootEmbeddedFileSequence::endJob() { closeFile(); }

void RootEmbeddedFileSequence::closeFile_() {
// delete the RootFile object.
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootEmbeddedFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ namespace edm {
RootEmbeddedFileSequence(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving
RootEmbeddedFileSequence& operator=(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving

void closeFile_() override;
void endJob();
void skipEntries(unsigned int offset);
bool readOneEvent(
Expand All @@ -56,6 +55,7 @@ namespace edm {
static void fillDescription(ParameterSetDescription& desc);

private:
void closeFile_() override;
void initFile_(bool skipBadFiles) override;
RootFileSharedPtr makeRootFile(std::shared_ptr<InputFile> filePtr) override;

Expand Down
61 changes: 38 additions & 23 deletions IOPool/Input/src/RootInputFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "Utilities/StorageFactory/interface/StorageFactory.h"
#include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
#include "FWCore/ServiceRegistry/interface/Service.h"

#include "TSystem.h"

Expand Down Expand Up @@ -208,7 +210,7 @@ namespace edm {
}
fileIterLastOpened_ = fileIterEnd_;
}
closeFile_();
closeFile();

if (noMoreFiles()) {
// No files specified
Expand Down Expand Up @@ -240,31 +242,36 @@ namespace edm {

//this tries to open the file using multiple PFNs corresponding to different data catalogs
std::list<std::string> exInfo;
for (std::vector<std::string>::const_iterator it = fNames.begin(); it != fNames.end(); ++it) {
try {
std::unique_ptr<InputSource::FileOpenSentry> sentry(
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, false) : nullptr);
std::unique_ptr<char[]> name(gSystem->ExpandPathName(it->c_str()));
filePtr = std::make_shared<InputFile>(name.get(), " Initiating request to open file ", inputType);
break;
} catch (cms::Exception const& e) {
if (!skipBadFiles && std::next(it) == fNames.end()) {
InputFile::reportSkippedFile((*it), logicalFileName());
Exception ex(errors::FileOpenError, "", e);
ex.addContext("Calling RootInputFileSequence::initTheFile()");
std::ostringstream out;
out << "Input file " << (*it) << " could not be opened.";
ex.addAdditionalInfo(out.str());
//report previous exceptions when use other names to open file
for (auto const& s : exInfo)
ex.addAdditionalInfo(s);
throw ex;
} else {
exInfo.push_back("Calling RootInputFileSequence::initTheFile(): fail to open the file with name " + (*it));
{
std::unique_ptr<InputSource::FileOpenSentry> sentry(
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, false) : nullptr);
edm::Service<edm::storage::StatisticsSenderService> service;
if (service.isAvailable()) {
service->openingFile(lfn(), inputType, -1);
}
for (std::vector<std::string>::const_iterator it = fNames.begin(); it != fNames.end(); ++it) {
try {
std::unique_ptr<char[]> name(gSystem->ExpandPathName(it->c_str()));
filePtr = std::make_shared<InputFile>(name.get(), " Initiating request to open file ", inputType);
break;
} catch (cms::Exception const& e) {
if (!skipBadFiles && std::next(it) == fNames.end()) {
InputFile::reportSkippedFile((*it), logicalFileName());
Exception ex(errors::FileOpenError, "", e);
ex.addContext("Calling RootInputFileSequence::initTheFile()");
std::ostringstream out;
out << "Input file " << (*it) << " could not be opened.";
ex.addAdditionalInfo(out.str());
//report previous exceptions when use other names to open file
for (auto const& s : exInfo)
ex.addAdditionalInfo(s);
throw ex;
} else {
exInfo.push_back("Calling RootInputFileSequence::initTheFile(): fail to open the file with name " + (*it));
}
}
}
}

if (filePtr) {
size_t currentIndexIntoFile = fileIter_ - fileIterBegin_;
rootFile_ = makeRootFile(filePtr);
Expand All @@ -287,6 +294,14 @@ namespace edm {
}
}

void RootInputFileSequence::closeFile() {
edm::Service<edm::storage::StatisticsSenderService> service;
if (rootFile() and service.isAvailable()) {
service->closedFile(lfn(), usedFallback());
}
closeFile_();
}

void RootInputFileSequence::setIndexIntoFile(size_t index) {
indexesIntoFiles_[index] = rootFile()->indexIntoFileSharedPtr();
}
Expand Down
2 changes: 2 additions & 0 deletions IOPool/Input/src/RootInputFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ namespace edm {
std::shared_ptr<ProductRegistry const> fileProductRegistry() const;
std::shared_ptr<BranchIDListHelper const> fileBranchIDListHelper() const;

void closeFile();

protected:
typedef std::shared_ptr<RootFile> RootFileSharedPtr;
void initFile(bool skipBadFiles) { initFile_(skipBadFiles); }
Expand Down
4 changes: 2 additions & 2 deletions IOPool/Input/src/RootPrimaryFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace edm {

RootPrimaryFileSequence::~RootPrimaryFileSequence() {}

void RootPrimaryFileSequence::endJob() { closeFile_(); }
void RootPrimaryFileSequence::endJob() { closeFile(); }

std::shared_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
std::shared_ptr<FileBlock> fileBlock;
Expand Down Expand Up @@ -246,7 +246,7 @@ namespace edm {
// Rewind to before the first event that was read.
void RootPrimaryFileSequence::rewind_() {
if (!atFirstFile()) {
closeFile_();
closeFile();
setAtFirstFile();
}
if (!rootFile()) {
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootPrimaryFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ namespace edm {
RootPrimaryFileSequence& operator=(RootPrimaryFileSequence const&) = delete; // Disallow copying and moving

std::shared_ptr<FileBlock> readFile_();
void closeFile_() override;
void endJob();
InputSource::ItemType getNextItemType(RunNumber_t& run, LuminosityBlockNumber_t& lumi, EventNumber_t& event);
void skipEventsAtBeginning(int offset);
Expand All @@ -57,6 +56,7 @@ namespace edm {
bool nextFile();
bool previousFile();
void rewindFile();
void closeFile_() override;

int remainingEvents() const;
int remainingLuminosityBlocks() const;
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootSecondaryFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ namespace edm {

RootSecondaryFileSequence::~RootSecondaryFileSequence() {}

void RootSecondaryFileSequence::endJob() { closeFile_(); }
void RootSecondaryFileSequence::endJob() { closeFile(); }

void RootSecondaryFileSequence::closeFile_() {
// close the currently open file, if any, and delete the RootFile object.
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootSecondaryFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ namespace edm {
RootSecondaryFileSequence(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving
RootSecondaryFileSequence& operator=(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving

void closeFile_() override;
void endJob();
void initAssociationsFromSecondary(std::set<BranchID> const&);

private:
void closeFile_() override;
void initFile_(bool skipBadFiles) override;
RootFileSharedPtr makeRootFile(std::shared_ptr<InputFile> filePtr) override;

Expand Down
2 changes: 1 addition & 1 deletion IOPool/TFileAdaptor/src/TStorageFactoryFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = ""
try {
edm::Service<edm::storage::StatisticsSenderService> statsService;
if (statsService.isAvailable()) {
statsService->setSize(storage_->size());
statsService->setSize(path, storage_->size());
}
} catch (edm::Exception const &e) {
if (e.categoryCode() != edm::errors::NotFound) {
Expand Down
55 changes: 42 additions & 13 deletions Utilities/StorageFactory/interface/StatisticsSenderService.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <sstream>
#include <atomic>
#include <mutex>
#include <tbb/concurrent_unordered_map.h>
#include "FWCore/Utilities/interface/InputType.h"

namespace edm {

Expand All @@ -16,19 +18,25 @@ namespace edm {

class StatisticsSenderService {
public:
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar);
StatisticsSenderService(edm::ParameterSet const& pset, edm::ActivityRegistry& ar);

void setSize(size_t size);
void setCurrentServer(const std::string &servername);
void filePreCloseEvent(std::string const &lfn, bool usedFallback);
static const char *getJobID();
static bool getX509Subject(std::string &);
void setSize(const std::string& urlOrLfn, size_t size);
void setCurrentServer(const std::string& urlOrLfn, const std::string& servername);
static const char* getJobID();
static bool getX509Subject(std::string&);

void openingFile(std::string const& lfn, edm::InputType type, size_t size = -1);
void closedFile(std::string const& lfn, bool usedFallback);

private:
void filePostCloseEvent(std::string const& lfn, bool usedFallback);

std::string const* matchedLfn(std::string const& iURL); //updates its internal cache
class FileStatistics {
public:
FileStatistics();
void fillUDP(std::ostringstream &os);
void fillUDP(std::ostringstream& os) const;
void update();

private:
ssize_t m_read_single_operations;
Expand All @@ -42,19 +50,40 @@ namespace edm {
time_t m_start_time;
};

void determineHostnames(void);
void fillUDP(const std::string &, bool, std::string &);
struct FileInfo {
explicit FileInfo(std::string const& iLFN, edm::InputType);

FileInfo(FileInfo&& iInfo)
: m_filelfn(std::move(iInfo.m_filelfn)),
m_serverhost(std::move(iInfo.m_serverhost)),
m_serverdomain(std::move(iInfo.m_serverdomain)),
m_type(iInfo.m_type),
m_size(iInfo.m_size.load()),
m_id(iInfo.m_id),
m_openCount(iInfo.m_openCount.load()) {}
std::string m_filelfn;
std::string m_serverhost;
std::string m_serverdomain;
edm::InputType m_type;
std::atomic<ssize_t> m_size;
size_t m_id; //from m_counter
std::atomic<int> m_openCount;
};

void determineHostnames();
void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&) const;
void cleanupOldFiles();

std::string m_clienthost;
std::string m_clientdomain;
std::string m_serverhost;
std::string m_serverdomain;
std::string m_filelfn;
tbb::concurrent_unordered_map<std::string, FileInfo> m_lfnToFileInfo;
tbb::concurrent_unordered_map<std::string, std::string> m_urlToLfn;
FileStatistics m_filestats;
std::string m_guid;
size_t m_counter;
std::atomic<ssize_t> m_size;
std::string m_userdn;
std::mutex m_servermutex;
const bool m_debug;
};

} // namespace storage
Expand Down
Loading

0 comments on commit 7356243

Please sign in to comment.