diff --git a/IOPool/Input/src/EmbeddedRootSource.cc b/IOPool/Input/src/EmbeddedRootSource.cc index bc9bf251666f1..f71854d29ad2f 100644 --- a/IOPool/Input/src/EmbeddedRootSource.cc +++ b/IOPool/Input/src/EmbeddedRootSource.cc @@ -45,7 +45,7 @@ namespace edm { InputFile::reportReadBranches(); } - void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile_(); } + void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile(); } bool EmbeddedRootSource::readOneEvent(EventPrincipal& cache, size_t& fileNameHash, diff --git a/IOPool/Input/src/PoolSource.cc b/IOPool/Input/src/PoolSource.cc index c3e01f338b5e3..c9cfd2167e564 100644 --- a/IOPool/Input/src/PoolSource.cc +++ b/IOPool/Input/src/PoolSource.cc @@ -162,7 +162,7 @@ namespace edm { return fb; } - void PoolSource::closeFile_() { primaryFileSequence_->closeFile_(); } + void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); } std::shared_ptr PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); } diff --git a/IOPool/Input/src/RootEmbeddedFileSequence.cc b/IOPool/Input/src/RootEmbeddedFileSequence.cc index d2b08d26a0a07..c4358c0736dc0 100644 --- a/IOPool/Input/src/RootEmbeddedFileSequence.cc +++ b/IOPool/Input/src/RootEmbeddedFileSequence.cc @@ -108,7 +108,7 @@ namespace edm { RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {} - void RootEmbeddedFileSequence::endJob() { closeFile_(); } + void RootEmbeddedFileSequence::endJob() { closeFile(); } void RootEmbeddedFileSequence::closeFile_() { // delete the RootFile object. diff --git a/IOPool/Input/src/RootEmbeddedFileSequence.h b/IOPool/Input/src/RootEmbeddedFileSequence.h index 2480ef8e19d43..1c95097cd8468 100644 --- a/IOPool/Input/src/RootEmbeddedFileSequence.h +++ b/IOPool/Input/src/RootEmbeddedFileSequence.h @@ -39,7 +39,6 @@ namespace edm { RootEmbeddedFileSequence(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving RootEmbeddedFileSequence& operator=(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving - void closeFile_() override; void endJob(); void skipEntries(unsigned int offset); bool readOneEvent( @@ -56,6 +55,7 @@ namespace edm { static void fillDescription(ParameterSetDescription& desc); private: + void closeFile_() override; void initFile_(bool skipBadFiles) override; RootFileSharedPtr makeRootFile(std::shared_ptr filePtr) override; diff --git a/IOPool/Input/src/RootInputFileSequence.cc b/IOPool/Input/src/RootInputFileSequence.cc index 1b1c24042345d..c24675ff8e9fd 100644 --- a/IOPool/Input/src/RootInputFileSequence.cc +++ b/IOPool/Input/src/RootInputFileSequence.cc @@ -10,6 +10,8 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" #include "Utilities/StorageFactory/interface/StorageFactory.h" +#include "Utilities/StorageFactory/interface/StatisticsSenderService.h" +#include "FWCore/ServiceRegistry/interface/Service.h" #include "TSystem.h" @@ -193,7 +195,7 @@ namespace edm { } fileIterLastOpened_ = fileIterEnd_; } - closeFile_(); + closeFile(); if (noMoreFiles()) { // No files specified @@ -217,6 +219,7 @@ namespace edm { lfn_ = logicalFileName().empty() ? fileNames()[0] : logicalFileName(); lfnHash_ = std::hash()(lfn_); + usedFallback_ = false; std::shared_ptr filePtr; std::list originalInfo; @@ -225,31 +228,37 @@ namespace edm { //this tries to open the file using multiple PFNs corresponding to different data catalogs std::list exInfo; - for (std::vector::const_iterator it = fNames.begin(); it != fNames.end(); ++it) { - try { - std::unique_ptr sentry( - input ? std::make_unique(*input, lfn_, false) : nullptr); - std::unique_ptr name(gSystem->ExpandPathName(it->c_str())); - filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); - break; - } catch (cms::Exception const& e) { - if (!skipBadFiles && std::next(it) == fNames.end()) { - InputFile::reportSkippedFile((*it), logicalFileName()); - Exception ex(errors::FileOpenError, "", e); - ex.addContext("Calling RootInputFileSequence::initTheFile()"); - std::ostringstream out; - out << "Input file " << (*it) << " could not be opened."; - ex.addAdditionalInfo(out.str()); - //report previous exceptions when use other names to open file - for (auto const& s : exInfo) - ex.addAdditionalInfo(s); - throw ex; - } else { - exInfo.push_back("Calling RootInputFileSequence::initTheFile(): fail to open the file with name " + (*it)); + { + std::unique_ptr sentry( + input ? std::make_unique(*input, lfn_, false) : nullptr); + edm::Service service; + if (service.isAvailable()) { + service->openingFile(lfn(), inputType, -1); + } + for (std::vector::const_iterator it = fNames.begin(); it != fNames.end(); ++it) { + try { + std::unique_ptr name(gSystem->ExpandPathName(it->c_str())); + filePtr = std::make_shared(name.get(), " Initiating request to open file ", inputType); + usedFallback_ = (it != fNames.begin()); + break; + } catch (cms::Exception const& e) { + if (!skipBadFiles && std::next(it) == fNames.end()) { + InputFile::reportSkippedFile((*it), logicalFileName()); + Exception ex(errors::FileOpenError, "", e); + ex.addContext("Calling RootInputFileSequence::initTheFile()"); + std::ostringstream out; + out << "Input file " << (*it) << " could not be opened."; + ex.addAdditionalInfo(out.str()); + //report previous exceptions when use other names to open file + for (auto const& s : exInfo) + ex.addAdditionalInfo(s); + throw ex; + } else { + exInfo.push_back("Calling RootInputFileSequence::initTheFile(): fail to open the file with name " + (*it)); + } } } } - if (filePtr) { size_t currentIndexIntoFile = fileIter_ - fileIterBegin_; rootFile_ = makeRootFile(filePtr); @@ -272,6 +281,14 @@ namespace edm { } } + void RootInputFileSequence::closeFile() { + edm::Service service; + if (rootFile() and service.isAvailable()) { + service->closedFile(lfn(), usedFallback()); + } + closeFile_(); + } + void RootInputFileSequence::setIndexIntoFile(size_t index) { indexesIntoFiles_[index] = rootFile()->indexIntoFileSharedPtr(); } diff --git a/IOPool/Input/src/RootInputFileSequence.h b/IOPool/Input/src/RootInputFileSequence.h index 905f2af1a41de..2f241229c1f5a 100644 --- a/IOPool/Input/src/RootInputFileSequence.h +++ b/IOPool/Input/src/RootInputFileSequence.h @@ -48,6 +48,8 @@ namespace edm { std::shared_ptr fileProductRegistry() const; std::shared_ptr fileBranchIDListHelper() const; + void closeFile(); + protected: typedef std::shared_ptr RootFileSharedPtr; void initFile(bool skipBadFiles) { initFile_(skipBadFiles); } diff --git a/IOPool/Input/src/RootPrimaryFileSequence.cc b/IOPool/Input/src/RootPrimaryFileSequence.cc index de0e09b95ede4..34038bde8a677 100644 --- a/IOPool/Input/src/RootPrimaryFileSequence.cc +++ b/IOPool/Input/src/RootPrimaryFileSequence.cc @@ -69,7 +69,7 @@ namespace edm { RootPrimaryFileSequence::~RootPrimaryFileSequence() {} - void RootPrimaryFileSequence::endJob() { closeFile_(); } + void RootPrimaryFileSequence::endJob() { closeFile(); } std::unique_ptr RootPrimaryFileSequence::readFile_() { if (firstFile_) { @@ -215,7 +215,7 @@ namespace edm { // Rewind to before the first event that was read. void RootPrimaryFileSequence::rewind_() { if (!atFirstFile()) { - closeFile_(); + closeFile(); setAtFirstFile(); } if (!rootFile()) { diff --git a/IOPool/Input/src/RootPrimaryFileSequence.h b/IOPool/Input/src/RootPrimaryFileSequence.h index 903e1001c82f5..f2390be10df64 100644 --- a/IOPool/Input/src/RootPrimaryFileSequence.h +++ b/IOPool/Input/src/RootPrimaryFileSequence.h @@ -40,7 +40,6 @@ namespace edm { RootPrimaryFileSequence& operator=(RootPrimaryFileSequence const&) = delete; // Disallow copying and moving std::unique_ptr readFile_(); - void closeFile_() override; void endJob(); InputSource::ItemType getNextItemType(RunNumber_t& run, LuminosityBlockNumber_t& lumi, EventNumber_t& event); bool skipEvents(int offset); @@ -56,6 +55,7 @@ namespace edm { bool nextFile(); bool previousFile(); void rewindFile(); + void closeFile_() override; int remainingEvents() const; int remainingLuminosityBlocks() const; diff --git a/IOPool/Input/src/RootSecondaryFileSequence.cc b/IOPool/Input/src/RootSecondaryFileSequence.cc index fc802d18e0c52..22dc51f3d1aea 100644 --- a/IOPool/Input/src/RootSecondaryFileSequence.cc +++ b/IOPool/Input/src/RootSecondaryFileSequence.cc @@ -52,7 +52,7 @@ namespace edm { RootSecondaryFileSequence::~RootSecondaryFileSequence() {} - void RootSecondaryFileSequence::endJob() { closeFile_(); } + void RootSecondaryFileSequence::endJob() { closeFile(); } void RootSecondaryFileSequence::closeFile_() { // close the currently open file, if any, and delete the RootFile object. diff --git a/IOPool/Input/src/RootSecondaryFileSequence.h b/IOPool/Input/src/RootSecondaryFileSequence.h index 6ff9be7c5f884..5ad6e3516f4a5 100644 --- a/IOPool/Input/src/RootSecondaryFileSequence.h +++ b/IOPool/Input/src/RootSecondaryFileSequence.h @@ -33,11 +33,11 @@ namespace edm { RootSecondaryFileSequence(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving RootSecondaryFileSequence& operator=(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving - void closeFile_() override; void endJob(); void initAssociationsFromSecondary(std::set const&); private: + void closeFile_() override; void initFile_(bool skipBadFiles) override; RootFileSharedPtr makeRootFile(std::shared_ptr filePtr) override; diff --git a/IOPool/SecondaryInput/test/SecondaryProducer.h b/IOPool/SecondaryInput/test/SecondaryProducer.h index 7d3086649deb3..22cb7ee1832d6 100644 --- a/IOPool/SecondaryInput/test/SecondaryProducer.h +++ b/IOPool/SecondaryInput/test/SecondaryProducer.h @@ -9,7 +9,7 @@ ************************************************************/ #include "DataFormats/Provenance/interface/EventID.h" -#include "FWCore/Framework/interface/EDProducer.h" +#include "FWCore/Framework/interface/one/EDProducer.h" #include "FWCore/Utilities/interface/get_underlying_safe.h" #include @@ -19,7 +19,7 @@ namespace edm { class ProcessConfiguration; class VectorInputSource; - class SecondaryProducer : public EDProducer { + class SecondaryProducer : public one::EDProducer<> { public: /** standard constructor*/ explicit SecondaryProducer(ParameterSet const& pset); diff --git a/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc b/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc index 5a8622d75d755..405453c42b876 100644 --- a/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc +++ b/IOPool/TFileAdaptor/src/TStorageFactoryFile.cc @@ -195,7 +195,7 @@ void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = "" try { edm::Service statsService; if (statsService.isAvailable()) { - statsService->setSize(storage_->size()); + statsService->setSize(path, storage_->size()); } } catch (edm::Exception const &e) { if (e.categoryCode() != edm::errors::NotFound) { diff --git a/Utilities/StorageFactory/interface/StatisticsSenderService.h b/Utilities/StorageFactory/interface/StatisticsSenderService.h index 24651dcc9f9c2..25d715e035abb 100644 --- a/Utilities/StorageFactory/interface/StatisticsSenderService.h +++ b/Utilities/StorageFactory/interface/StatisticsSenderService.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include "FWCore/Utilities/interface/InputType.h" namespace edm { @@ -16,19 +18,25 @@ namespace edm { class StatisticsSenderService { public: - StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar); + StatisticsSenderService(edm::ParameterSet const& pset, edm::ActivityRegistry& ar); - void setSize(size_t size); - void setCurrentServer(const std::string &servername); - void filePreCloseEvent(std::string const &lfn, bool usedFallback); - static const char *getJobID(); - static bool getX509Subject(std::string &); + void setSize(const std::string& urlOrLfn, size_t size); + void setCurrentServer(const std::string& urlOrLfn, const std::string& servername); + static const char* getJobID(); + static bool getX509Subject(std::string&); + + void openingFile(std::string const& lfn, edm::InputType type, size_t size = -1); + void closedFile(std::string const& lfn, bool usedFallback); private: + void filePostCloseEvent(std::string const& lfn, bool usedFallback); + + std::string const* matchedLfn(std::string const& iURL); //updates its internal cache class FileStatistics { public: FileStatistics(); - void fillUDP(std::ostringstream &os); + void fillUDP(std::ostringstream& os) const; + void update(); private: ssize_t m_read_single_operations; @@ -42,19 +50,40 @@ namespace edm { time_t m_start_time; }; - void determineHostnames(void); - void fillUDP(const std::string &, bool, std::string &); + struct FileInfo { + explicit FileInfo(std::string const& iLFN, edm::InputType); + + FileInfo(FileInfo&& iInfo) + : m_filelfn(std::move(iInfo.m_filelfn)), + m_serverhost(std::move(iInfo.m_serverhost)), + m_serverdomain(std::move(iInfo.m_serverdomain)), + m_type(iInfo.m_type), + m_size(iInfo.m_size.load()), + m_id(iInfo.m_id), + m_openCount(iInfo.m_openCount.load()) {} + std::string m_filelfn; + std::string m_serverhost; + std::string m_serverdomain; + edm::InputType m_type; + std::atomic m_size; + size_t m_id; //from m_counter + std::atomic m_openCount; + }; + + void determineHostnames(); + void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&) const; + void cleanupOldFiles(); + std::string m_clienthost; std::string m_clientdomain; - std::string m_serverhost; - std::string m_serverdomain; - std::string m_filelfn; + tbb::concurrent_unordered_map m_lfnToFileInfo; + tbb::concurrent_unordered_map m_urlToLfn; FileStatistics m_filestats; std::string m_guid; size_t m_counter; - std::atomic m_size; std::string m_userdn; std::mutex m_servermutex; + const bool m_debug; }; } // namespace storage diff --git a/Utilities/StorageFactory/src/StatisticsSenderService.cc b/Utilities/StorageFactory/src/StatisticsSenderService.cc index b627a8a5067b0..0e96624fe1ef0 100644 --- a/Utilities/StorageFactory/src/StatisticsSenderService.cc +++ b/Utilities/StorageFactory/src/StatisticsSenderService.cc @@ -4,6 +4,7 @@ #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/Catalog/interface/SiteLocalConfig.h" #include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/Utilities/src/Guid.h" #include @@ -15,11 +16,7 @@ #include #include -#define UPDATE_STATISTIC(x) m_##x = x; - -#define UPDATE_AND_OUTPUT_STATISTIC(x) \ - os << "\"" #x "\":" << (x - m_##x) << ", "; \ - UPDATE_STATISTIC(x) +#define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", "; // Simple hack to define HOST_NAME_MAX on Mac. // Allows arrays to be statically allocated @@ -27,8 +24,8 @@ #define HOST_NAME_MAX 128 #endif -#define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID" -#define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId" +static constexpr char const *const JOB_UNIQUE_ID_ENV = "CRAB_UNIQUE_JOB_ID"; +static constexpr char const *const JOB_UNIQUE_ID_ENV_V2 = "DashboardJobId"; using namespace edm::storage; @@ -43,7 +40,7 @@ StatisticsSenderService::FileStatistics::FileStatistics() m_read_vector_count_square(0), m_start_time(time(nullptr)) {} -void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { +void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) const { const StorageAccount::StorageStats &stats = StorageAccount::summary(); ssize_t read_single_operations = 0; ssize_t read_single_bytes = 0; @@ -77,35 +74,31 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { double single_sum = read_single_bytes - m_read_single_bytes; double single_average = single_sum / static_cast(single_op_count); os << "\"read_single_sigma\":" - << sqrt((static_cast(read_single_square - m_read_single_square) - - single_average * single_average * single_op_count) / - static_cast(single_op_count)) + << sqrt(std::abs((static_cast(read_single_square - m_read_single_square) - + single_average * single_average * single_op_count) / + static_cast(single_op_count))) << ", "; os << "\"read_single_average\":" << single_average << ", "; } - m_read_single_square = read_single_square; int64_t vector_op_count = read_vector_operations - m_read_vector_operations; if (vector_op_count > 0) { double vector_average = static_cast(read_vector_bytes - m_read_vector_bytes) / static_cast(vector_op_count); os << "\"read_vector_average\":" << vector_average << ", "; os << "\"read_vector_sigma\":" - << sqrt((static_cast(read_vector_square - m_read_vector_square) - - vector_average * vector_average * vector_op_count) / - static_cast(vector_op_count)) + << sqrt(std::abs((static_cast(read_vector_square - m_read_vector_square) - + vector_average * vector_average * vector_op_count) / + static_cast(vector_op_count))) << ", "; double vector_count_average = static_cast(read_vector_count_sum - m_read_vector_count_sum) / static_cast(vector_op_count); os << "\"read_vector_count_average\":" << vector_count_average << ", "; os << "\"read_vector_count_sigma\":" - << sqrt((static_cast(read_vector_count_square - m_read_vector_count_square) - - vector_count_average * vector_count_average * vector_op_count) / - static_cast(vector_op_count)) + << sqrt(std::abs((static_cast(read_vector_count_square - m_read_vector_count_square) - + vector_count_average * vector_count_average * vector_op_count) / + static_cast(vector_op_count))) << ", "; } - m_read_vector_square = read_vector_square; - m_read_vector_count_square = read_vector_count_square; - m_read_vector_count_sum = read_vector_count_sum; os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", "; @@ -113,30 +106,75 @@ void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) { << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", "; // See top of file for macros; not complex, just avoiding copy/paste - UPDATE_AND_OUTPUT_STATISTIC(read_single_operations) - UPDATE_AND_OUTPUT_STATISTIC(read_single_bytes) - UPDATE_AND_OUTPUT_STATISTIC(read_vector_operations) - UPDATE_AND_OUTPUT_STATISTIC(read_vector_bytes) + OUTPUT_STATISTIC(read_single_operations) + OUTPUT_STATISTIC(read_single_bytes) + OUTPUT_STATISTIC(read_vector_operations) + OUTPUT_STATISTIC(read_vector_bytes) os << "\"start_time\":" << m_start_time << ", "; - m_start_time = time(nullptr); // NOTE: last entry doesn't have the trailing comma. - os << "\"end_time\":" << m_start_time; + os << "\"end_time\":" << time(nullptr); } -StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const & /*pset*/, edm::ActivityRegistry &ar) - : m_clienthost("unknown"), - m_clientdomain("unknown"), +void StatisticsSenderService::FileStatistics::update() { + const StorageAccount::StorageStats &stats = StorageAccount::summary(); + ssize_t read_single_operations = 0; + ssize_t read_single_bytes = 0; + ssize_t read_single_square = 0; + ssize_t read_vector_operations = 0; + ssize_t read_vector_bytes = 0; + ssize_t read_vector_square = 0; + ssize_t read_vector_count_sum = 0; + ssize_t read_vector_count_square = 0; + auto token = StorageAccount::tokenForStorageClassName("tstoragefile"); + for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) { + if (i->first == token.value()) { + continue; + } + for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) { + if (j->first == static_cast(StorageAccount::Operation::readv)) { + read_vector_operations += j->second.attempts; + read_vector_bytes += j->second.amount; + read_vector_count_square += j->second.vector_square; + read_vector_square += j->second.amount_square; + read_vector_count_sum += j->second.vector_count; + } else if (j->first == static_cast(StorageAccount::Operation::read)) { + read_single_operations += j->second.attempts; + read_single_bytes += j->second.amount; + read_single_square += j->second.amount_square; + } + } + } + + m_read_single_square = read_single_square; + m_read_vector_square = read_vector_square; + m_read_vector_count_square = read_vector_count_square; + m_read_vector_count_sum = read_vector_count_sum; + m_read_single_operations = read_single_operations; + m_read_single_bytes = read_single_bytes; + m_read_vector_operations = read_vector_operations; + m_read_vector_bytes = read_vector_bytes; + m_start_time = time(nullptr); +} +StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN, edm::InputType iType) + : m_filelfn(iLFN), m_serverhost("unknown"), m_serverdomain("unknown"), - m_filelfn("unknown"), + m_type(iType), + m_size(-1), + m_id(0), + m_openCount(1) {} + +StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, edm::ActivityRegistry &ar) + : m_clienthost("unknown"), + m_clientdomain("unknown"), m_filestats(), m_guid(Guid().toString()), m_counter(0), - m_size(-1), - m_userdn("unknown") { + m_userdn("unknown"), + m_debug(iPSet.getUntrackedParameter("debug", false)) { determineHostnames(); - ar.watchPreCloseFile(this, &StatisticsSenderService::filePreCloseEvent); + ar.watchPostCloseFile(this, &StatisticsSenderService::filePostCloseEvent); if (!getX509Subject(m_userdn)) { m_userdn = "unknown"; } @@ -148,8 +186,36 @@ const char *StatisticsSenderService::getJobID() { return id ? id : std::getenv(JOB_UNIQUE_ID_ENV_V2); } -void StatisticsSenderService::setCurrentServer(const std::string &servername) { - size_t dot_pos = servername.find("."); +std::string const *StatisticsSenderService::matchedLfn(std::string const &iURL) { + auto found = m_urlToLfn.find(iURL); + if (found != m_urlToLfn.end()) { + return &found->second; + } + for (auto const &v : m_lfnToFileInfo) { + if (v.first.size() < iURL.size()) { + if (v.first == iURL.substr(iURL.size() - v.first.size())) { + m_urlToLfn.emplace(iURL, v.first); + return &m_urlToLfn.find(iURL)->second; + } + } + } + //does the lfn have a protocol and the iURL not? + if (std::string::npos == iURL.find(':')) { + for (auto const &v : m_lfnToFileInfo) { + if ((std::string::npos != v.first.find(':')) and (v.first.size() > iURL.size())) { + if (iURL == v.first.substr(v.first.size() - iURL.size())) { + m_urlToLfn.emplace(iURL, v.first); + return &m_urlToLfn.find(iURL)->second; + } + } + } + } + + return nullptr; +} + +void StatisticsSenderService::setCurrentServer(const std::string &url, const std::string &servername) { + size_t dot_pos = servername.find('.'); std::string serverhost; std::string serverdomain; if (dot_pos == std::string::npos) { @@ -163,24 +229,41 @@ void StatisticsSenderService::setCurrentServer(const std::string &servername) { } } { + auto lfn = matchedLfn(url); std::lock_guard sentry(m_servermutex); - m_serverhost = std::move(serverhost); - m_serverdomain = std::move(serverdomain); + if (nullptr != lfn) { + auto found = m_lfnToFileInfo.find(*lfn); + if (found != m_lfnToFileInfo.end()) { + found->second.m_serverhost = std::move(serverhost); + found->second.m_serverdomain = std::move(serverdomain); + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "setCurrentServer: unknown url name " << url << "\n"; + } } } -void StatisticsSenderService::setSize(size_t size) { m_size = size; } - -void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) { - m_filelfn = lfn; +void StatisticsSenderService::openingFile(std::string const &lfn, edm::InputType type, size_t size) { + m_urlToLfn.emplace(lfn, lfn); + auto attempt = m_lfnToFileInfo.emplace(lfn, FileInfo{lfn, type}); + if (attempt.second) { + attempt.first->second.m_size = size; + attempt.first->second.m_id = m_counter++; + edm::LogInfo("StatisticsSenderService") << "openingFile: opening " << lfn << "\n"; + } else { + ++(attempt.first->second.m_openCount); + edm::LogInfo("StatisticsSenderService") << "openingFile: re-opening" << lfn << "\n"; + } +} +void StatisticsSenderService::closedFile(std::string const &url, bool usedFallback) { edm::Service pSLC; if (!pSLC.isAvailable()) { return; } const struct addrinfo *addresses = pSLC->statisticsDestination(); - if (!addresses) { + if (!addresses and !m_debug) { return; } @@ -190,22 +273,86 @@ void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool use m_userdn = "not reported"; } - std::string results; - fillUDP(pSLC->siteName(), usedFallback, results); + auto lfn = matchedLfn(url); + if (nullptr != lfn) { + auto found = m_lfnToFileInfo.find(*lfn); + assert(found != m_lfnToFileInfo.end()); - for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) { - int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); - if (sock < 0) { - continue; + std::string results; + fillUDP(pSLC->siteName(), found->second, usedFallback, results); + if (m_debug) { + edm::LogSystem("StatisticSenderService") << "\n" << results << "\n"; } - auto close_del = [](int *iSocket) { close(*iSocket); }; - std::unique_ptr guard(&sock, close_del); - if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) { - break; + + for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) { + int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); + if (sock < 0) { + continue; + } + auto close_del = [](int *iSocket) { close(*iSocket); }; + std::unique_ptr guard(&sock, close_del); + if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) { + break; + } } + + auto c = --found->second.m_openCount; + if (m_debug) { + if (c == 0) { + edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n"; + } else { + edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n"; + } + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n"; + } +} + +void StatisticsSenderService::cleanupOldFiles() { + //remove entries with openCount of 0 + bool moreToTest = false; + do { + moreToTest = false; + for (auto it = m_lfnToFileInfo.begin(); it != m_lfnToFileInfo.end(); ++it) { + if (it->second.m_openCount == 0) { + auto lfn = it->first; + bool moreToTest2 = false; + do { + moreToTest2 = false; + for (auto it2 = m_urlToLfn.begin(); it2 != m_urlToLfn.end(); ++it2) { + if (it2->second == lfn) { + m_urlToLfn.unsafe_erase(it2); + moreToTest2 = true; + break; + } + } + } while (moreToTest2); + + m_lfnToFileInfo.unsafe_erase(it); + moreToTest = true; + break; + } + } + } while (moreToTest); +} + +void StatisticsSenderService::setSize(const std::string &url, size_t size) { + auto lfn = matchedLfn(url); + if (nullptr != lfn) { + auto itFound = m_lfnToFileInfo.find(*lfn); + if (itFound != m_lfnToFileInfo.end()) { + itFound->second.m_size = size; + } + } else if (m_debug) { + edm::LogWarning("StatisticsSenderService") << "setSize: unknown url name " << url << "\n"; } +} - m_counter++; +void StatisticsSenderService::filePostCloseEvent(std::string const &lfn, bool usedFallback) { + //we are at a sync point in the framwework so no new files are being opened + cleanupOldFiles(); + m_filestats.update(); } void StatisticsSenderService::determineHostnames(void) { @@ -225,7 +372,10 @@ void StatisticsSenderService::determineHostnames(void) { } } -void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFallback, std::string &udpinfo) { +void StatisticsSenderService::fillUDP(const std::string &siteName, + const FileInfo &fileinfo, + bool usedFallback, + std::string &udpinfo) const { std::ostringstream os; // Header - same for all IO accesses @@ -235,22 +385,34 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFall } if (usedFallback) { os << "\"fallback\": true, "; + } else { + os << "\"fallback\": false, "; } - std::string serverhost; - std::string serverdomain; - { - std::lock_guard sentry(m_servermutex); - serverhost = m_serverhost; - serverdomain = m_serverdomain; + os << "\"type\": "; + switch (fileinfo.m_type) { + case edm::InputType::Primary: { + os << "\"primary\", "; + break; + } + case edm::InputType::SecondaryFile: { + os << "\"secondary\", "; + break; + } + case edm::InputType::SecondarySource: { + os << "\"embedded\", "; + break; + } } + auto serverhost = fileinfo.m_serverhost; + auto serverdomain = fileinfo.m_serverdomain; os << "\"user_dn\":\"" << m_userdn << "\", "; os << "\"client_host\":\"" << m_clienthost << "\", "; os << "\"client_domain\":\"" << m_clientdomain << "\", "; os << "\"server_host\":\"" << serverhost << "\", "; os << "\"server_domain\":\"" << serverdomain << "\", "; - os << "\"unique_id\":\"" << m_guid << "-" << m_counter << "\", "; - os << "\"file_lfn\":\"" << m_filelfn << "\", "; + os << "\"unique_id\":\"" << m_guid << "-" << fileinfo.m_id << "\", "; + os << "\"file_lfn\":\"" << fileinfo.m_filelfn << "\", "; // Dashboard devs requested that we send out no app_info if a job ID // is not present in the environment. const char *jobId = getJobID(); @@ -258,8 +420,8 @@ void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFall os << "\"app_info\":\"" << jobId << "\", "; } - if (m_size >= 0) { - os << "\"file_size\":" << m_size << ", "; + if (fileinfo.m_size >= 0) { + os << "\"file_size\":" << fileinfo.m_size << ", "; } m_filestats.fillUDP(os); diff --git a/Utilities/StorageFactory/test/BuildFile.xml b/Utilities/StorageFactory/test/BuildFile.xml index 1ef9d5b865f8f..25578023ce536 100644 --- a/Utilities/StorageFactory/test/BuildFile.xml +++ b/Utilities/StorageFactory/test/BuildFile.xml @@ -42,6 +42,7 @@ +