From 67071cc703c0a08001cc44cf383c5fa588252828 Mon Sep 17 00:00:00 2001 From: Jan Chyczynski Date: Thu, 18 Aug 2022 11:41:22 +0200 Subject: [PATCH] created perLS PopCon: cloned old one, removed irrelevant code and adjusted the rest --- .../LHCInfoPerLSPopConSourceHandler.h | 63 +++ CondTools/RunInfo/plugins/BuildFile.xml | 6 +- .../plugins/LHCInfoPerLSPopConAnalyzer.cc | 7 + .../LHCInfoPerLSPopConAnalyzerEndFill.py | 88 ++++ .../LHCInfoPerLSPopConAnalyzerStartFill.py | 88 ++++ .../src/LHCInfoPerLSPopConSourceHandler.cc | 479 ++++++++++++++++++ 6 files changed, 730 insertions(+), 1 deletion(-) create mode 100644 CondTools/RunInfo/interface/LHCInfoPerLSPopConSourceHandler.h create mode 100644 CondTools/RunInfo/plugins/LHCInfoPerLSPopConAnalyzer.cc create mode 100644 CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzerEndFill.py create mode 100644 CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzerStartFill.py create mode 100644 CondTools/RunInfo/src/LHCInfoPerLSPopConSourceHandler.cc diff --git a/CondTools/RunInfo/interface/LHCInfoPerLSPopConSourceHandler.h b/CondTools/RunInfo/interface/LHCInfoPerLSPopConSourceHandler.h new file mode 100644 index 0000000000000..de5c54b8e8521 --- /dev/null +++ b/CondTools/RunInfo/interface/LHCInfoPerLSPopConSourceHandler.h @@ -0,0 +1,63 @@ +#ifndef LHCINFOPOPCONSOURCEHANDLER_H +#define LHCINFOPOPCONSOURCEHANDLER_H + +#include + +#include "CondCore/PopCon/interface/PopConSourceHandler.h" +#include "CondFormats/RunInfo/interface/LHCInfoPerLS.h" +#include "FWCore/ParameterSet/interface/ParameterSetfwd.h" +#include "CondCore/CondDB/interface/Types.h" +#include "CondTools/RunInfo/interface/OMSAccess.h" + + +namespace cond { + class OMSService; +} + +class LHCInfoPerLSPopConSourceHandler : public popcon::PopConSourceHandler { +public: + LHCInfoPerLSPopConSourceHandler(const edm::ParameterSet& pset); + ~LHCInfoPerLSPopConSourceHandler() override; + void getNewObjects() override; + std::string id() const override; + + static constexpr unsigned int kLumisectionsQueryLimit = 4000; + +private: + void addEmptyPayload(cond::Time_t iov); + + bool makeFillPayload(std::unique_ptr& targetPayload, + const cond::OMSServiceResult& queryResult); + + + size_t getLumiData(const cond::OMSService& service, + unsigned short fillId, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime); + bool getCTTPSData(cond::persistency::Session& session, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime); + +private: + bool m_debug; + // starting date for sampling + boost::posix_time::ptime m_startTime; + boost::posix_time::ptime m_endTime; + // sampling interval in seconds + unsigned int m_samplingInterval; + bool m_endFill = true; + std::string m_name; + //for reading from relational database source + std::string m_connectionString, m_ecalConnectionString; + std::string m_dipSchema, m_authpath; + std::string m_omsBaseUrl; + std::unique_ptr m_fillPayload; + std::shared_ptr m_prevPayload; + cond::Time_t m_startFillTime; + cond::Time_t m_endFillTime; + cond::Time_t m_prevEndFillTime; + std::vector > > m_tmpBuffer; + bool m_lastPayloadEmpty = false; +}; + +#endif diff --git a/CondTools/RunInfo/plugins/BuildFile.xml b/CondTools/RunInfo/plugins/BuildFile.xml index 4adf874fba8b1..592b9909bfc42 100644 --- a/CondTools/RunInfo/plugins/BuildFile.xml +++ b/CondTools/RunInfo/plugins/BuildFile.xml @@ -63,4 +63,8 @@ - \ No newline at end of file + + + + + diff --git a/CondTools/RunInfo/plugins/LHCInfoPerLSPopConAnalyzer.cc b/CondTools/RunInfo/plugins/LHCInfoPerLSPopConAnalyzer.cc new file mode 100644 index 0000000000000..a33a817d25498 --- /dev/null +++ b/CondTools/RunInfo/plugins/LHCInfoPerLSPopConAnalyzer.cc @@ -0,0 +1,7 @@ +#include "CondCore/PopCon/interface/PopConAnalyzer.h" +#include "CondTools/RunInfo/interface/LHCInfoPerLSPopConSourceHandler.h" +#include "FWCore/Framework/interface/MakerMacros.h" + +typedef popcon::PopConAnalyzer LHCInfoPerLSPopConAnalyzer; +//define this as a plug-in +DEFINE_FWK_MODULE(LHCInfoPerLSPopConAnalyzer); diff --git a/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzerEndFill.py b/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzerEndFill.py new file mode 100644 index 0000000000000..3b7ff8f41cd98 --- /dev/null +++ b/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzerEndFill.py @@ -0,0 +1,88 @@ +import socket +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing +process = cms.Process("LHCInfoPerLSPopulator") +from CondCore.CondDB.CondDB_cfi import * +#process.load("CondCore.DBCommon.CondDBCommon_cfi") +#process.CondDBCommon.connect = 'sqlite_file:lhcinfoperls_pop_test.db' +#process.CondDBCommon.DBParameters.authenticationPath = '.' +#process.CondDBCommon.DBParameters.messageLevel=cms.untracked.int32(1) + +sourceConnection = 'oracle://cms_omds_adg/CMS_RUNINFO_R' +if socket.getfqdn().find('.cms') != -1: + sourceConnection = 'oracle://cms_omds_lb/CMS_RUNINFO_R' + +options = VarParsing.VarParsing() +options.register( 'destinationConnection' + , 'sqlite_file:lhcinfo_pop_test.db' #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "Connection string to the DB where payloads will be possibly written." + ) +options.register( 'targetConnection' + , '' #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , """Connection string to the target DB: + if not empty (default), this provides the latest IOV and payloads to compare; + it is the DB where payloads should be finally uploaded.""" + ) +options.register( 'tag' + , 'LHCInfoPerLS_PopCon_start_test' + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "Tag written in destinationConnection and finally appended in targetConnection." + ) +options.register( 'messageLevel' + , 0 #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.int + , "Message level; default to 0" + ) +options.parseArguments() + +CondDBConnection = CondDB.clone( connect = cms.string( options.destinationConnection ) ) +CondDBConnection.DBParameters.messageLevel = cms.untracked.int32( options.messageLevel ) + +process.MessageLogger = cms.Service("MessageLogger", + cout = cms.untracked.PSet(threshold = cms.untracked.string('INFO')), + destinations = cms.untracked.vstring('cout') + ) + +process.source = cms.Source("EmptyIOVSource", + lastValue = cms.uint64(1), + timetype = cms.string('runnumber'), + firstValue = cms.uint64(1), + interval = cms.uint64(1) + ) + +process.PoolDBOutputService = cms.Service("PoolDBOutputService", + CondDBConnection, + timetype = cms.untracked.string('timestamp'), + toPut = cms.VPSet(cms.PSet(record = cms.string('LHCInfoPerLSRcd'), + tag = cms.string( options.tag ) + ) + ) + ) + +process.Test1 = cms.EDAnalyzer("LHCInfoPerLSPopConAnalyzer", + SinceAppendMode = cms.bool(True), + record = cms.string('LHCInfoPerLSRcd'), + name = cms.untracked.string('LHCInfo'), + Source = cms.PSet(fill = cms.untracked.uint32(6417), + startTime = cms.untracked.string('2021-09-10 03:10:18.000'), + # endTime = cms.untracked.string('2018-04-06 05:00:00.000'), + samplingInterval = cms.untracked.uint32( 600 ), + endFill = cms.untracked.bool(True), + connectionString = cms.untracked.string("oracle://cms_orcon_adg/CMS_RUNTIME_LOGGER"), + ecalConnectionString = cms.untracked.string("oracle://cms_orcon_adg/CMS_DCS_ENV_PVSS_COND"), + DIPSchema = cms.untracked.string("CMS_BEAM_COND"), + omsBaseUrl = cms.untracked.string("http://vocms0184.cern.ch/agg/api/v1"), + #authenticationPath = cms.untracked.string("."), + debug=cms.untracked.bool(False) + ), + loggingOn = cms.untracked.bool(True), + IsDestDbCheckedInQueryLog = cms.untracked.bool(False) + ) + +process.p = cms.Path(process.Test1) diff --git a/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzerStartFill.py b/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzerStartFill.py new file mode 100644 index 0000000000000..ac1461f80c1b4 --- /dev/null +++ b/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzerStartFill.py @@ -0,0 +1,88 @@ +import socket +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing +process = cms.Process("LHCInfoPerLSPopulator") +from CondCore.CondDB.CondDB_cfi import * +#process.load("CondCore.DBCommon.CondDBCommon_cfi") +#process.CondDBCommon.connect = 'sqlite_file:lhcinfoperls_pop_test.db' +#process.CondDBCommon.DBParameters.authenticationPath = '.' +#process.CondDBCommon.DBParameters.messageLevel=cms.untracked.int32(1) + +sourceConnection = 'oracle://cms_omds_adg/CMS_RUNINFO_R' +if socket.getfqdn().find('.cms') != -1: + sourceConnection = 'oracle://cms_omds_lb/CMS_RUNINFO_R' + +options = VarParsing.VarParsing() +options.register( 'destinationConnection' + , 'sqlite_file:lhcinfo_pop_test.db' #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "Connection string to the DB where payloads will be possibly written." + ) +options.register( 'targetConnection' + , '' #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , """Connection string to the target DB: + if not empty (default), this provides the latest IOV and payloads to compare; + it is the DB where payloads should be finally uploaded.""" + ) +options.register( 'tag' + , 'LHCInfoPerLS_PopCon_start_test' + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "Tag written in destinationConnection and finally appended in targetConnection." + ) +options.register( 'messageLevel' + , 0 #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.int + , "Message level; default to 0" + ) +options.parseArguments() + +CondDBConnection = CondDB.clone( connect = cms.string( options.destinationConnection ) ) +CondDBConnection.DBParameters.messageLevel = cms.untracked.int32( options.messageLevel ) + +process.MessageLogger = cms.Service("MessageLogger", + cout = cms.untracked.PSet(threshold = cms.untracked.string('INFO')), + destinations = cms.untracked.vstring('cout') + ) + +process.source = cms.Source("EmptyIOVSource", + lastValue = cms.uint64(1), + timetype = cms.string('runnumber'), + firstValue = cms.uint64(1), + interval = cms.uint64(1) + ) + +process.PoolDBOutputService = cms.Service("PoolDBOutputService", + CondDBConnection, + timetype = cms.untracked.string('timestamp'), + toPut = cms.VPSet(cms.PSet(record = cms.string('LHCInfoPerLSRcd'), + tag = cms.string( options.tag ) + ) + ) + ) + +process.Test1 = cms.EDAnalyzer("LHCInfoPerLSPopConAnalyzer", + SinceAppendMode = cms.bool(True), + record = cms.string('LHCInfoPerLSRcd'), + name = cms.untracked.string('LHCInfo'), + Source = cms.PSet(fill = cms.untracked.uint32(6417), + startTime = cms.untracked.string('2021-09-10 03:10:18.000'), + # endTime = cms.untracked.string('2018-04-06 05:00:00.000'), + samplingInterval = cms.untracked.uint32( 600 ), + endFill = cms.untracked.bool(False), + connectionString = cms.untracked.string("oracle://cms_orcon_adg/CMS_RUNTIME_LOGGER"), + ecalConnectionString = cms.untracked.string("oracle://cms_orcon_adg/CMS_DCS_ENV_PVSS_COND"), + DIPSchema = cms.untracked.string("CMS_BEAM_COND"), + omsBaseUrl = cms.untracked.string("http://vocms0184.cern.ch/agg/api/v1"), + #authenticationPath = cms.untracked.string("."), + debug=cms.untracked.bool(False) + ), + loggingOn = cms.untracked.bool(True), + IsDestDbCheckedInQueryLog = cms.untracked.bool(False) + ) + +process.p = cms.Path(process.Test1) diff --git a/CondTools/RunInfo/src/LHCInfoPerLSPopConSourceHandler.cc b/CondTools/RunInfo/src/LHCInfoPerLSPopConSourceHandler.cc new file mode 100644 index 0000000000000..eadd16227f46c --- /dev/null +++ b/CondTools/RunInfo/src/LHCInfoPerLSPopConSourceHandler.cc @@ -0,0 +1,479 @@ +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "CondCore/CondDB/interface/ConnectionPool.h" +#include "CondFormats/Common/interface/TimeConversions.h" +#include "CondTools/RunInfo/interface/LHCInfoPerLSPopConSourceHandler.h" +#include "CondTools/RunInfo/interface/OMSAccess.h" +#include "RelationalAccess/ISessionProxy.h" +#include "RelationalAccess/ISchema.h" +#include "RelationalAccess/IQuery.h" +#include "RelationalAccess/ICursor.h" +#include "CoralBase/AttributeList.h" +#include "CoralBase/Attribute.h" +#include "CoralBase/AttributeSpecification.h" +#include "CoralBase/TimeStamp.h" +#include +#include +#include +#include +#include +#include +#include + +LHCInfoPerLSPopConSourceHandler::LHCInfoPerLSPopConSourceHandler(edm::ParameterSet const& pset) + : m_debug(pset.getUntrackedParameter("debug", false)), + m_startTime(), + m_endTime(), + m_samplingInterval((unsigned int)pset.getUntrackedParameter("samplingInterval", 300)), + m_endFill(pset.getUntrackedParameter("endFill", true)), + m_name(pset.getUntrackedParameter("name", "LHCInfoPerLSPopConSourceHandler")), + m_connectionString(pset.getUntrackedParameter("connectionString", "")), + m_ecalConnectionString(pset.getUntrackedParameter("ecalConnectionString", "")), + m_dipSchema(pset.getUntrackedParameter("DIPSchema", "")), + m_authpath(pset.getUntrackedParameter("authenticationPath", "")), + m_omsBaseUrl(pset.getUntrackedParameter("omsBaseUrl", "")), + m_fillPayload(), + m_prevPayload(), + m_tmpBuffer() { + if (pset.exists("startTime")) { + m_startTime = boost::posix_time::time_from_string(pset.getUntrackedParameter("startTime")); + } + boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + m_endTime = now; + if (pset.exists("endTime")) { + m_endTime = boost::posix_time::time_from_string(pset.getUntrackedParameter("endTime")); + if (m_endTime > now) + m_endTime = now; + } +} +//L1: try with different m_dipSchema +//L2: try with different m_name +LHCInfoPerLSPopConSourceHandler::~LHCInfoPerLSPopConSourceHandler() {} + +namespace LHCInfoPerLSImpl { + + struct IOVComp { + bool operator()(const cond::Time_t& x, const std::pair>& y) { + return (x < y.first); + } + }; + + // function to search in the vector the target time + std::vector>>::const_iterator search( + const cond::Time_t& val, const std::vector>>& container) { + if (container.empty()) + return container.end(); + auto p = std::upper_bound(container.begin(), container.end(), val, IOVComp()); + return (p != container.begin()) ? p - 1 : container.end(); + } + +} // namespace LHCInfoPerLSImpl + +bool LHCInfoPerLSPopConSourceHandler::makeFillPayload (std::unique_ptr& targetPayload, + const cond::OMSServiceResult& queryResult) { + bool ret = false; + if (!queryResult.empty()) { + auto row = *queryResult.begin(); + auto currentFill = row.get("fill_number"); + m_startFillTime = cond::time::from_boost(row.get("start_time")); + m_endFillTime = cond::time::from_boost(row.get("end_time")); + targetPayload = std::make_unique(); + targetPayload->setFillNumber(currentFill); + ret = true; + } + return ret; +} + +size_t LHCInfoPerLSPopConSourceHandler::getLumiData(const cond::OMSService& oms, + unsigned short fillId, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime) { + auto query = oms.query("lumisections"); + query->addOutputVars({"start_time", "delivered_lumi", "recorded_lumi"}); + query->filterEQ("fill_number", fillId); + query->filterGT("start_time", beginFillTime).filterLT("start_time", endFillTime); + query->limit(kLumisectionsQueryLimit); + size_t nlumi = 0; + if (query->execute()) { + auto res = query->result(); + std::stringstream condIovs; + std::stringstream posixIovs; + for (auto r : res) { + nlumi++; + auto lumiTime = r.get("start_time"); + LHCInfoPerLS* thisLumiSectionInfo = new LHCInfoPerLS(*m_fillPayload); + m_tmpBuffer.emplace_back(std::make_pair(cond::time::from_boost(lumiTime), thisLumiSectionInfo)); + LHCInfoPerLS& payload = *thisLumiSectionInfo; + + condIovs << cond::time::from_boost(lumiTime) << " "; + posixIovs << lumiTime << " "; + } + + } + return nlumi; +} + +namespace LHCInfoPerLSImpl { + struct LumiSectionFilter { + LumiSectionFilter(const std::vector>>& samples) + : currLow(samples.begin()), currUp(samples.begin()), end(samples.end()) { + currUp++; + } + + void reset(const std::vector>>& samples) { + currLow = samples.begin(); + currUp = samples.begin(); + currUp++; + end = samples.end(); + currentDipTime = 0; + } + + bool process(cond::Time_t dipTime) { + if (currLow == end) + return false; + bool search = false; + if (currentDipTime == 0) { + search = true; + } else { + if (dipTime == currentDipTime) + return true; + else { + cond::Time_t upper = cond::time::MAX_VAL; + if (currUp != end) + upper = currUp->first; + if (dipTime < upper) + return false; + else { + search = true; + } + } + } + if (search) { + while (currUp != end and currUp->first < dipTime) { + currLow++; + currUp++; + } + currentDipTime = dipTime; + return currLow != end; + } + return false; + } + + cond::Time_t currentSince() { return currLow->first; } + LHCInfoPerLS& currentPayload() { return *currLow->second; } + + std::vector>>::const_iterator current() { return currLow; } + std::vector>>::const_iterator currLow; + std::vector>>::const_iterator currUp; + std::vector>>::const_iterator end; + cond::Time_t currentDipTime = 0; + }; +} // namespace LHCInfoPerLSImpl + + +bool LHCInfoPerLSPopConSourceHandler::getCTTPSData(cond::persistency::Session& session, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime) { + //run the fifth query against the CTPPS schema + //Initializing the CMS_CTP_CTPPS_COND schema. + coral::ISchema& CTPPS = session.coralSession().schema("CMS_PPS_SPECT_COND"); + //execute query for CTPPS Data + std::unique_ptr CTPPSDataQuery(CTPPS.newQuery()); + //FROM clause + CTPPSDataQuery->addToTableList(std::string("PPS_LHC_MACHINE_PARAMS")); + //SELECT clause + CTPPSDataQuery->addToOutputList(std::string("DIP_UPDATE_TIME")); + CTPPSDataQuery->addToOutputList(std::string("LUMI_SECTION")); + CTPPSDataQuery->addToOutputList(std::string("XING_ANGLE_P5_X_URAD")); + CTPPSDataQuery->addToOutputList(std::string("BETA_STAR_P5_X_M")); + //WHERE CLAUSE + coral::AttributeList CTPPSDataBindVariables; + CTPPSDataBindVariables.extend(std::string("beginFillTime")); + CTPPSDataBindVariables.extend(std::string("endFillTime")); + CTPPSDataBindVariables[std::string("beginFillTime")].data() = coral::TimeStamp(beginFillTime); + CTPPSDataBindVariables[std::string("endFillTime")].data() = coral::TimeStamp(endFillTime); + std::string conditionStr = std::string("DIP_UPDATE_TIME>= :beginFillTime and DIP_UPDATE_TIME< :endFillTime"); + CTPPSDataQuery->setCondition(conditionStr, CTPPSDataBindVariables); + //ORDER BY clause + CTPPSDataQuery->addToOrderList(std::string("DIP_UPDATE_TIME")); + //define query output + coral::AttributeList CTPPSDataOutput; + CTPPSDataOutput.extend(std::string("DIP_UPDATE_TIME")); + CTPPSDataOutput.extend(std::string("LUMI_SECTION")); + CTPPSDataOutput.extend(std::string("XING_ANGLE_P5_X_URAD")); + CTPPSDataOutput.extend(std::string("BETA_STAR_P5_X_M")); + CTPPSDataQuery->defineOutput(CTPPSDataOutput); + //execute the query + coral::ICursor& CTPPSDataCursor = CTPPSDataQuery->execute(); + cond::Time_t dipTime = 0; + unsigned int lumiSection = 0; + float crossingAngle = 0., betastar = 0.; + + bool ret = false; + LHCInfoPerLSImpl::LumiSectionFilter filter(m_tmpBuffer); + while (CTPPSDataCursor.next()) { + if (m_debug) { + std::ostringstream CTPPS; + CTPPSDataCursor.currentRow().toOutputStream(CTPPS); + } + coral::Attribute const& dipTimeAttribute = CTPPSDataCursor.currentRow()[std::string("DIP_UPDATE_TIME")]; + if (!dipTimeAttribute.isNull()) { + dipTime = cond::time::from_boost(dipTimeAttribute.data().time()); + if (filter.process(dipTime)) { + ret = true; + coral::Attribute const& lumiSectionAttribute = CTPPSDataCursor.currentRow()[std::string("LUMI_SECTION")]; + if (!lumiSectionAttribute.isNull()) { + lumiSection = lumiSectionAttribute.data(); + } + coral::Attribute const& crossingAngleXAttribute = + CTPPSDataCursor.currentRow()[std::string("XING_ANGLE_P5_X_URAD")]; + if (!crossingAngleXAttribute.isNull()) { + crossingAngle = crossingAngleXAttribute.data(); + } + coral::Attribute const& betaStarXAttribute = CTPPSDataCursor.currentRow()[std::string("BETA_STAR_P5_X_M")]; + if (!betaStarXAttribute.isNull()) { + betastar = betaStarXAttribute.data(); + } + for (auto it = filter.current(); it != m_tmpBuffer.end(); it++) { + // set the current values to all of the payloads of the lumi section samples after the current since + LHCInfoPerLS& payload = *(it->second); + payload.setCrossingAngleX(crossingAngle); + payload.setBetaStarX(betastar); + payload.setLumiSection(lumiSection); + } + } + } + } + return ret; +} + +void LHCInfoPerLSPopConSourceHandler::addEmptyPayload(cond::Time_t iov) { + bool add = false; + if (m_iovs.empty()) { + if (!m_lastPayloadEmpty) + add = true; + } else { + auto lastAdded = m_iovs.rbegin()->second; + if (lastAdded->fillNumber() != 0) { + add = true; + } + } + if (add) { + auto newPayload = std::make_shared(); + m_iovs.insert(std::make_pair(iov, newPayload)); + m_prevPayload = newPayload; + m_prevEndFillTime = 0; + } +} + +namespace LHCInfoPerLSImpl { + bool comparePayloads(const LHCInfoPerLS& rhs, const LHCInfoPerLS& lhs) { + if (rhs.fillNumber() != lhs.fillNumber()) + return false; + if (rhs.runNumber() != lhs.runNumber()) + return false; + if (rhs.crossingAngleX() != lhs.crossingAngleX()) + return false; + if (rhs.crossingAngleY() != lhs.crossingAngleY()) + return false; + if (rhs.betaStarX() != lhs.betaStarX()) + return false; + if (rhs.betaStarY() != lhs.betaStarY()) + return false; + return true; + } + + size_t transferPayloads(const std::vector>>& buffer, + std::map>& iovsToTransfer, + std::shared_ptr& prevPayload) { + size_t niovs = 0; + std::stringstream condIovs; + for (auto& iov : buffer) { + bool add = false; + auto payload = iov.second; + cond::Time_t since = iov.first; + if (iovsToTransfer.empty()) { + add = true; + } else { + LHCInfoPerLS& lastAdded = *iovsToTransfer.rbegin()->second; + if (!comparePayloads(lastAdded, *payload)) { + add = true; + } + } + if (add) { + niovs++; + condIovs << since << " "; + iovsToTransfer.insert(std::make_pair(since, payload)); + prevPayload = iov.second; + } + } + edm::LogInfo("transferPayloads") << "TRANSFERED COND IOVS: " << condIovs.str(); + return niovs; + } + +} // namespace LHCInfoPerLSImpl + +void LHCInfoPerLSPopConSourceHandler::getNewObjects() { + //if a new tag is created, transfer fake fill from 1 to the first fill for the first time + if (tagInfo().size == 0) { + edm::LogInfo(m_name) << "New tag " << tagInfo().name << "; from " << m_name << "::getNewObjects"; + } else { + //check what is already inside the database + edm::LogInfo(m_name) << "got info for tag " << tagInfo().name << ": size " << tagInfo().size + << ", last object valid since " << tagInfo().lastInterval.since << " ( " + << boost::posix_time::to_iso_extended_string( + cond::time::to_boost(tagInfo().lastInterval.since)) + << " ); from " << m_name << "::getNewObjects"; + } + + cond::Time_t lastSince = tagInfo().lastInterval.since; + if (tagInfo().isEmpty()) { + // for a new or empty tag, an empty payload should be added on top with since=1 + addEmptyPayload(1); + lastSince = 1; + } else { + edm::LogInfo(m_name) << "The last Iov in tag " << tagInfo().name << " valid since " << lastSince << "from " + << m_name << "::getNewObjects"; + } + + boost::posix_time::ptime executionTime = boost::posix_time::second_clock::local_time(); + cond::Time_t targetSince = 0; + cond::Time_t endIov = cond::time::from_boost(executionTime); + if (!m_startTime.is_not_a_date_time()) { + targetSince = cond::time::from_boost(m_startTime); + } + if (lastSince > targetSince) + targetSince = lastSince; + + edm::LogInfo(m_name) << "Starting sampling at " + << boost::posix_time::to_simple_string(cond::time::to_boost(targetSince)); + + //retrieve the data from the relational database source + cond::persistency::ConnectionPool connection; + //configure the connection + if (m_debug) { + connection.setMessageVerbosity(coral::Debug); + } else { + connection.setMessageVerbosity(coral::Error); + } + connection.setAuthenticationPath(m_authpath); + connection.configure(); + //create the sessions + cond::persistency::Session session = connection.createSession(m_connectionString, false); + cond::persistency::Session session2 = connection.createSession(m_ecalConnectionString, false); + // fetch last payload when available + if (!tagInfo().lastInterval.payloadId.empty()) { + cond::persistency::Session session3 = dbSession(); + session3.transaction().start(true); + m_prevPayload = session3.fetchPayload(tagInfo().lastInterval.payloadId); + session3.transaction().commit(); + if(m_prevPayload->fillNumber() != 0) + { + cond::OMSService oms; + oms.connect(m_omsBaseUrl); + auto query = oms.query("fills"); + query->addOutputVar("end_time"); + query->filterEQ("fill_number", m_prevPayload->fillNumber()); + bool foundFill = query->execute(); + if (foundFill) { + auto result = query->result(); + + if (!result.empty()) { + auto endFillTime = (*result.begin()).get("end_time"); + m_prevEndFillTime = cond::time::from_boost(endFillTime); + } + else { + foundFill = false; + } + } + if (!foundFill) { + edm::LogError(m_name) << "Could not find end time of fill #" << m_prevPayload->fillNumber(); + } + } + else + { + m_prevEndFillTime = 0; + } + } + + bool iovAdded = false; + while (true) { + if (targetSince >= endIov) { + edm::LogInfo(m_name) << "Sampling ended at the time " + << boost::posix_time::to_simple_string(cond::time::to_boost(endIov)); + break; + } + boost::posix_time::ptime targetTime = cond::time::to_boost(targetSince); + boost::posix_time::ptime startSampleTime; + boost::posix_time::ptime endSampleTime; + + cond::OMSService oms; + oms.connect(m_omsBaseUrl); + auto query = oms.query("fills"); + + if (!m_endFill and m_prevPayload->fillNumber() and m_prevEndFillTime == 0ULL) { + // execute the query for the current fill + edm::LogInfo(m_name) << "Searching started fill #" << m_prevPayload->fillNumber(); + query->filterEQ("fill_number", m_prevPayload->fillNumber()); + bool foundFill = query->execute(); + if (foundFill) + foundFill = makeFillPayload(m_fillPayload, query->result()); + if (!foundFill) { + edm::LogError(m_name) << "Could not find fill #" << m_prevPayload->fillNumber(); + break; + } + startSampleTime = cond::time::to_boost(lastSince); + } else { + edm::LogInfo(m_name) << "Searching new fill after " << boost::posix_time::to_simple_string(targetTime); + boost::posix_time::ptime startTime = targetTime + boost::posix_time::seconds(1); + query->filterNotNull("start_stable_beam").filterGT("start_time", startTime).filterNotNull("fill_number"); + if (m_endFill) + query->filterNotNull("end_time"); + bool foundFill = query->execute(); + if (foundFill) + foundFill = makeFillPayload(m_fillPayload, query->result()); + if (!foundFill) { + edm::LogInfo(m_name) << "No fill found - END of job."; + if (iovAdded) + addEmptyPayload(targetSince); + break; + } + startSampleTime = cond::time::to_boost(m_startFillTime); + } + + unsigned short lhcFill = m_fillPayload->fillNumber(); + if (m_endFillTime == 0ULL) { + edm::LogInfo(m_name) << "Found ongoing fill " << lhcFill << " created at " << cond::time::to_boost(m_startFillTime); + endSampleTime = executionTime; + targetSince = endIov; + } else { + edm::LogInfo(m_name) << "Found fill " << lhcFill << " created at " << cond::time::to_boost(m_startFillTime) + << " ending at " << cond::time::to_boost(m_endFillTime); + endSampleTime = cond::time::to_boost(m_endFillTime); + targetSince = m_endFillTime; + } + + + size_t nlumi = getLumiData(oms, lhcFill, startSampleTime, endSampleTime); + edm::LogInfo(m_name) << "Found " << nlumi << " lumisections during the fill " << lhcFill; + boost::posix_time::ptime flumiStart = cond::time::to_boost(m_tmpBuffer.front().first); + boost::posix_time::ptime flumiStop = cond::time::to_boost(m_tmpBuffer.back().first); + edm::LogInfo(m_name) << "First lumi starts at " << flumiStart << " last lumi starts at " << flumiStop; + session.transaction().start(true); + getCTTPSData(session, startSampleTime, endSampleTime); + session.transaction().commit(); + + // + size_t niovs = LHCInfoPerLSImpl::transferPayloads(m_tmpBuffer, m_iovs, m_prevPayload); + edm::LogInfo(m_name) << "Added " << niovs << " iovs within the Fill time"; + if(niovs){ + m_prevEndFillTime = m_endFillTime; + } + m_tmpBuffer.clear(); + iovAdded = true; + if (m_prevPayload->fillNumber() and m_endFillTime != 0ULL) + addEmptyPayload(m_endFillTime); + } +} + +std::string LHCInfoPerLSPopConSourceHandler::id() const { return m_name; }