diff --git a/CondTools/RunInfo/interface/OMSAccess.h b/CondTools/RunInfo/interface/OMSAccess.h index 2cbecced23fee..104dd6073cc77 100644 --- a/CondTools/RunInfo/interface/OMSAccess.h +++ b/CondTools/RunInfo/interface/OMSAccess.h @@ -52,6 +52,12 @@ namespace cond { unsigned long int_val = from_string_impl(attributeValue, 0); return (unsigned short)int_val; } + inline unsigned long long s_to_ull(const std::string& val) { return std::stoull(val); } + template <> + inline unsigned long long from_string(const std::string& attributeValue) { + unsigned long long int_val = from_string_impl(attributeValue, 0); + return int_val; + } inline boost::posix_time::ptime s_to_time(const std::string& val) { boost::posix_time::time_input_facet* facet = new boost::posix_time::time_input_facet(OMS_TIME_FMT); @@ -147,6 +153,9 @@ namespace cond { OMSServiceResultIterator begin() const; OMSServiceResultIterator end() const; + OMSServiceResultRef front() const; + OMSServiceResultRef back() const; + // parse json returned from curl, filling the property tree size_t parseData(const std::string& data); diff --git a/CondTools/RunInfo/plugins/BuildFile.xml b/CondTools/RunInfo/plugins/BuildFile.xml index 6fff2bf931f54..7463d7150b208 100644 --- a/CondTools/RunInfo/plugins/BuildFile.xml +++ b/CondTools/RunInfo/plugins/BuildFile.xml @@ -49,6 +49,26 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/CondTools/RunInfo/plugins/LHCInfoPerFillPopConAnalyzer.cc b/CondTools/RunInfo/plugins/LHCInfoPerFillPopConAnalyzer.cc new file mode 100644 index 0000000000000..4757ae0ce420e --- /dev/null +++ b/CondTools/RunInfo/plugins/LHCInfoPerFillPopConAnalyzer.cc @@ -0,0 +1,736 @@ +#include "CondCore/CondDB/interface/ConnectionPool.h" +#include "CondCore/PopCon/interface/PopConAnalyzer.h" +#include "CondCore/PopCon/interface/PopConSourceHandler.h" +#include "CondFormats/Common/interface/TimeConversions.h" +#include "CondFormats/RunInfo/interface/LHCInfoPerFill.h" +#include "CondTools/RunInfo/interface/OMSAccess.h" +#include "CoralBase/Attribute.h" +#include "CoralBase/AttributeList.h" +#include "CoralBase/AttributeSpecification.h" +#include "CoralBase/TimeStamp.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetfwd.h" +#include "LumiSectionFilter.h" +#include "RelationalAccess/ICursor.h" +#include "RelationalAccess/IQuery.h" +#include "RelationalAccess/ISchema.h" +#include "RelationalAccess/ISessionProxy.h" +#include +#include +#include +#include +#include +#include +#include + +class LHCInfoPerFillPopConSourceHandler; + +typedef popcon::PopConAnalyzer LHCInfoPerFillPopConAnalyzer; +//define this as a plug-in +DEFINE_FWK_MODULE(LHCInfoPerFillPopConAnalyzer); + +namespace cond { + namespace theLHCInfoPerFillPopConImpl { + + static const std::pair s_fillTypeMap[] = { + std::make_pair("PROTONS", LHCInfoPerFill::PROTONS), + std::make_pair("IONS", LHCInfoPerFill::IONS), + std::make_pair("COSMICS", LHCInfoPerFill::COSMICS), + std::make_pair("GAP", LHCInfoPerFill::GAP)}; + + static const std::pair s_particleTypeMap[] = { + std::make_pair("PROTON", LHCInfoPerFill::PROTON), + std::make_pair("PB82", LHCInfoPerFill::PB82), + std::make_pair("AR18", LHCInfoPerFill::AR18), + std::make_pair("D", LHCInfoPerFill::D), + std::make_pair("XE54", LHCInfoPerFill::XE54)}; + + LHCInfoPerFill::FillType fillTypeFromString(const std::string& s_fill_type) { + for (auto const& i : s_fillTypeMap) + if (s_fill_type == i.first) + return i.second; + return LHCInfoPerFill::UNKNOWN; + } + + LHCInfoPerFill::ParticleType particleTypeFromString(const std::string& s_particle_type) { + for (auto const& i : s_particleTypeMap) + if (s_particle_type == i.first) + return i.second; + return LHCInfoPerFill::NONE; + } + } // namespace theLHCInfoPerFillPopConImpl + + namespace impl { + + template <> + LHCInfoPerFill::FillType from_string(const std::string& attributeValue) { + return from_string_impl( + attributeValue, LHCInfoPerFill::UNKNOWN); + } + + template <> + LHCInfoPerFill::ParticleType from_string(const std::string& attributeValue) { + return from_string_impl( + attributeValue, LHCInfoPerFill::NONE); + } + + } // namespace impl +} // namespace cond + +namespace theLHCInfoPerFillImpl { + + bool makeFillPayload(std::unique_ptr& targetPayload, const cond::OMSServiceResult& queryResult) { + bool ret = false; + if (!queryResult.empty()) { + auto row = *queryResult.begin(); + auto currentFill = row.get("fill_number"); + auto bunches1 = row.get("bunches_beam1"); + auto bunches2 = row.get("bunches_beam2"); + auto collidingBunches = row.get("bunches_colliding"); + auto targetBunches = row.get("bunches_target"); + auto fillType = row.get("fill_type_runtime"); + auto particleType1 = row.get("fill_type_party1"); + auto particleType2 = row.get("fill_type_party2"); + auto intensityBeam1 = row.get("intensity_beam1"); + auto intensityBeam2 = row.get("intensity_beam2"); + auto energy = row.get("energy"); + auto creationTime = row.get("start_time"); + auto stableBeamStartTime = row.get("start_stable_beam"); + auto beamDumpTime = row.get("end_time"); + auto injectionScheme = row.get("injection_scheme"); + targetPayload = std::make_unique(); + targetPayload->setFillNumber(currentFill); + targetPayload->setBunchesInBeam1(bunches1); + targetPayload->setBunchesInBeam2(bunches2); + targetPayload->setCollidingBunches(collidingBunches); + targetPayload->setTargetBunches(targetBunches); + targetPayload->setFillType(fillType); + targetPayload->setParticleTypeForBeam1(particleType1); + targetPayload->setParticleTypeForBeam2(particleType2); + targetPayload->setIntensityForBeam1(intensityBeam1); + targetPayload->setIntensityForBeam2(intensityBeam2); + targetPayload->setEnergy(energy); + targetPayload->setCreationTime(cond::time::from_boost(creationTime)); + targetPayload->setBeginTime(cond::time::from_boost(stableBeamStartTime)); + targetPayload->setEndTime(cond::time::from_boost(beamDumpTime)); + targetPayload->setInjectionScheme(injectionScheme); + ret = true; + } + return ret; + } +} // namespace theLHCInfoPerFillImpl + +namespace theLHCInfoPerFillImpl { + static const std::map vecMap = { + {"Beam1/beamPhaseMean", 1}, {"Beam2/beamPhaseMean", 2}, {"Beam1/cavPhaseMean", 3}, {"Beam2/cavPhaseMean", 4}}; + void setElementData(cond::Time_t since, + const std::string& dipVal, + unsigned int elementNr, + float value, + LHCInfoPerFill& payload, + std::set& initList) { + if (initList.find(since) == initList.end()) { + payload.beam1VC().resize(LHCInfoPerFill::bunchSlots, 0.); + payload.beam2VC().resize(LHCInfoPerFill::bunchSlots, 0.); + payload.beam1RF().resize(LHCInfoPerFill::bunchSlots, 0.); + payload.beam2RF().resize(LHCInfoPerFill::bunchSlots, 0.); + initList.insert(since); + } + // set the current values to all of the payloads of the lumi section samples after the current since + if (elementNr < LHCInfoPerFill::bunchSlots) { + switch (vecMap.at(dipVal)) { + case 1: + payload.beam1VC()[elementNr] = value; + break; + case 2: + payload.beam2VC()[elementNr] = value; + break; + case 3: + payload.beam1RF()[elementNr] = value; + break; + case 4: + payload.beam2RF()[elementNr] = value; + break; + default: + break; + } + } + } +} // namespace theLHCInfoPerFillImpl + +namespace theLHCInfoPerFillImpl { + bool comparePayloads(const LHCInfoPerFill& rhs, const LHCInfoPerFill& lhs) { + if (rhs.fillNumber() != lhs.fillNumber() || rhs.delivLumi() != lhs.delivLumi() || rhs.recLumi() != lhs.recLumi() || + rhs.instLumi() != lhs.instLumi() || rhs.instLumiError() != lhs.instLumiError() || + rhs.lhcState() != rhs.lhcState() || rhs.lhcComment() != rhs.lhcComment() || + rhs.ctppsStatus() != rhs.ctppsStatus()) { + return false; + } + return true; + } + + size_t transferPayloads(const std::vector>>& buffer, + std::map>& iovsToTransfer, + std::shared_ptr& prevPayload) { + size_t niovs = 0; + std::stringstream condIovs; + std::stringstream formattedIovs; + for (auto& iov : buffer) { + bool add = false; + auto payload = iov.second; + cond::Time_t since = iov.first; + if (iovsToTransfer.empty()) { + add = true; + } else { + LHCInfoPerFill& lastAdded = *iovsToTransfer.rbegin()->second; + if (!comparePayloads(lastAdded, *payload)) { + add = true; + } + } + if (add) { + niovs++; + condIovs << since << " "; + formattedIovs << boost::posix_time::to_iso_extended_string(cond::time::to_boost(since)) << " "; + iovsToTransfer.insert(std::make_pair(since, payload)); + prevPayload = iov.second; + } + } + edm::LogInfo("transferPayloads") << "TRANSFERED IOVS: " << condIovs.str(); + edm::LogInfo("transferPayloads") << "FORMATTED TRANSFERED IOVS: " << formattedIovs.str(); + return niovs; + } + +} // namespace theLHCInfoPerFillImpl +class LHCInfoPerFillPopConSourceHandler : public popcon::PopConSourceHandler { +public: + LHCInfoPerFillPopConSourceHandler(edm::ParameterSet const& pset) + : m_debug(pset.getUntrackedParameter("debug", false)), + m_startTime(), + m_endTime(), + m_samplingInterval((unsigned int)pset.getUntrackedParameter("samplingInterval", 300)), + m_endFillMode(pset.getUntrackedParameter("endFill", true)), + m_name(pset.getUntrackedParameter("name", "LHCInfoPerFillPopConSourceHandler")), + m_connectionString(pset.getUntrackedParameter("connectionString", "")), + m_ecalConnectionString(pset.getUntrackedParameter("ecalConnectionString", "")), + m_dipSchema(pset.getUntrackedParameter("DIPSchema", "")), + m_authpath(pset.getUntrackedParameter("authenticationPath", "")), + m_omsBaseUrl(pset.getUntrackedParameter("omsBaseUrl", "")), + m_fillPayload(), + m_prevPayload(), + m_tmpBuffer() { + if (!pset.getUntrackedParameter("startTime").empty()) { + m_startTime = boost::posix_time::time_from_string(pset.getUntrackedParameter("startTime")); + } + boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + m_endTime = now; + if (!pset.getUntrackedParameter("endTime").empty()) { + m_endTime = boost::posix_time::time_from_string(pset.getUntrackedParameter("endTime")); + if (m_endTime > now) + m_endTime = now; + } + } + //L1: try with different m_dipSchema + //L2: try with different m_name + ~LHCInfoPerFillPopConSourceHandler() override = default; + void getNewObjects() override { + //reference to the last payload in the tag + Ref previousFill; + + //if a new tag is created, transfer fake fill from 1 to the first fill for the first time + if (tagInfo().size == 0) { + edm::LogInfo(m_name) << "New tag " << tagInfo().name << "; from " << m_name << "::getNewObjects"; + } else { + //check what is already inside the database + edm::LogInfo(m_name) << "got info for tag " << tagInfo().name << ": size " << tagInfo().size + << ", last object valid since " << tagInfo().lastInterval.since << " ( " + << boost::posix_time::to_iso_extended_string( + cond::time::to_boost(tagInfo().lastInterval.since)) + << " ); from " << m_name << "::getNewObjects"; + } + + cond::Time_t lastSince = tagInfo().lastInterval.since; + if (tagInfo().isEmpty()) { + // for a new or empty tag, an empty payload should be added on top with since=1 + addEmptyPayload(1); + lastSince = 1; + } else { + edm::LogInfo(m_name) << "The last Iov in tag " << tagInfo().name << " valid since " << lastSince << "from " + << m_name << "::getNewObjects"; + } + + boost::posix_time::ptime executionTime = boost::posix_time::second_clock::local_time(); + cond::Time_t targetSince = 0; + cond::Time_t executionTimeIov = cond::time::from_boost(executionTime); + if (!m_startTime.is_not_a_date_time()) { + targetSince = cond::time::from_boost(m_startTime); + } + if (lastSince > targetSince) + targetSince = lastSince; + + edm::LogInfo(m_name) << "Starting sampling at " + << boost::posix_time::to_simple_string(cond::time::to_boost(targetSince)); + + //retrieve the data from the relational database source + cond::persistency::ConnectionPool connection; + //configure the connection + if (m_debug) { + connection.setMessageVerbosity(coral::Debug); + } else { + connection.setMessageVerbosity(coral::Error); + } + connection.setAuthenticationPath(m_authpath); + connection.configure(); + //create the sessions + cond::persistency::Session session = connection.createSession(m_connectionString, false); + cond::persistency::Session session2 = connection.createSession(m_ecalConnectionString, false); + // fetch last payload when available + if (!tagInfo().lastInterval.payloadId.empty()) { + cond::persistency::Session session3 = dbSession(); + session3.transaction().start(true); + m_prevPayload = session3.fetchPayload(tagInfo().lastInterval.payloadId); + session3.transaction().commit(); + } + + // bool iovAdded = false; + while (true) { + if (targetSince >= executionTimeIov) { + edm::LogInfo(m_name) << "Sampling ended at the time " + << boost::posix_time::to_simple_string(cond::time::to_boost(executionTimeIov)); + break; + } + bool updateEcal = false; + boost::posix_time::ptime targetTime = cond::time::to_boost(targetSince); + boost::posix_time::ptime startSampleTime; + boost::posix_time::ptime endSampleTime; + + cond::OMSService oms; + oms.connect(m_omsBaseUrl); + auto query = oms.query("fills"); + + edm::LogInfo(m_name) << "Searching new fill after " << boost::posix_time::to_simple_string(targetTime); + query->filterNotNull("start_stable_beam").filterNotNull("fill_number"); + if (targetTime > cond::time::to_boost(m_prevPayload->createTime())) { + query->filterGE("start_time", targetTime); + } else { + query->filterGT("start_time", targetTime); + } + + query->filterLT("start_time", m_endTime); + if (m_endFillMode) + query->filterNotNull("end_time"); + bool foundFill = query->execute(); + if (foundFill) + foundFill = theLHCInfoPerFillImpl::makeFillPayload(m_fillPayload, query->result()); + if (!foundFill) { + edm::LogInfo(m_name) << "No fill found - END of job."; + // if (iovAdded) + // addEmptyPayload(targetSince); + break; + } + + startSampleTime = cond::time::to_boost(m_fillPayload->createTime()); + cond::Time_t startFillTime = m_fillPayload->createTime(); + cond::Time_t endFillTime = m_fillPayload->endTime(); + unsigned short lhcFill = m_fillPayload->fillNumber(); + bool ongoingFill = endFillTime == 0ULL; + if (ongoingFill) { + edm::LogInfo(m_name) << "Found ongoing fill " << lhcFill << " created at " + << cond::time::to_boost(startFillTime); + endSampleTime = executionTime; + targetSince = executionTimeIov; + } else { + edm::LogInfo(m_name) << "Found fill " << lhcFill << " created at " << cond::time::to_boost(startFillTime) + << " ending at " << cond::time::to_boost(endFillTime); + endSampleTime = cond::time::to_boost(endFillTime); + targetSince = endFillTime; + } + if (m_endFillMode || ongoingFill) { + getDipData(oms, startSampleTime, endSampleTime); + getLumiData(oms, lhcFill, startSampleTime, endSampleTime); + if (!m_tmpBuffer.empty()) { + boost::posix_time::ptime flumiStart = cond::time::to_boost(m_tmpBuffer.front().first); + boost::posix_time::ptime flumiStop = cond::time::to_boost(m_tmpBuffer.back().first); + edm::LogInfo(m_name) << "First lumi starts at " << flumiStart << " last lumi starts at " << flumiStop; + session.transaction().start(true); + getCTTPSData(session, startSampleTime, endSampleTime); + session.transaction().commit(); + session2.transaction().start(true); + getEcalData(session2, startSampleTime, endSampleTime, updateEcal); + session2.transaction().commit(); + } + } + + size_t niovs = theLHCInfoPerFillImpl::transferPayloads(m_tmpBuffer, m_iovs, m_prevPayload); + edm::LogInfo(m_name) << "Added " << niovs << " iovs within the Fill time"; + m_tmpBuffer.clear(); + // iovAdded = true; + if (m_prevPayload->fillNumber() and !ongoingFill) + addEmptyPayload(endFillTime); + } + } + + std::string id() const override { return m_name; } + + static constexpr unsigned int kLumisectionsQueryLimit = 4000; + +private: + void addEmptyPayload(cond::Time_t iov) { + bool add = false; + if (m_iovs.empty()) { + if (!m_lastPayloadEmpty) + add = true; + } else { + auto lastAdded = m_iovs.rbegin()->second; + if (lastAdded->fillNumber() != 0) { + add = true; + } + } + if (add) { + auto newPayload = std::make_shared(); + m_iovs.insert(std::make_pair(iov, newPayload)); + m_prevPayload = newPayload; + edm::LogInfo(m_name) << "Added empty payload with IOV " << iov << " ( " + << boost::posix_time::to_iso_extended_string(cond::time::to_boost(iov)) << " )"; + } + } + + void addPayloadToBuffer(cond::OMSServiceResultRef& row) { + auto lumiTime = row.get("start_time"); + auto delivLumi = row.get("delivered_lumi"); + auto recLumi = row.get("recorded_lumi"); + LHCInfoPerFill* thisLumiSectionInfo = m_fillPayload->cloneFill(); + m_tmpBuffer.emplace_back(std::make_pair(cond::time::from_boost(lumiTime), thisLumiSectionInfo)); + LHCInfoPerFill& payload = *thisLumiSectionInfo; + payload.setDelivLumi(delivLumi); + payload.setRecLumi(recLumi); + } + + size_t getLumiData(const cond::OMSService& oms, + unsigned short fillId, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime) { + auto query = oms.query("lumisections"); + query->addOutputVars({"start_time", "delivered_lumi", "recorded_lumi", "beams_stable"}); + query->filterEQ("fill_number", fillId); + query->filterGT("start_time", beginFillTime).filterLT("start_time", endFillTime); + query->filterEQ("beams_stable", "true"); + query->limit(kLumisectionsQueryLimit); + if (query->execute()) { + int nLumi = 0; + auto queryResult = query->result(); + edm::LogInfo(m_name) << "Found " << queryResult.size() << " lumisections with STABLE BEAM during the fill " + << fillId; + + if (!queryResult.empty()) { + if (m_endFillMode) { + auto firstRow = queryResult.front(); + addPayloadToBuffer(firstRow); + nLumi++; + } + + auto lastRow = queryResult.back(); + addPayloadToBuffer(lastRow); + nLumi++; + } + } + return 0; + } + + void getDipData(const cond::OMSService& oms, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime) { + // unsure how to handle this. + // the old implementation is not helping: apparently it is checking only the bunchconfiguration for the first diptime set of values... + auto query1 = oms.query("diplogger/dip/acc/LHC/RunControl/CirculatingBunchConfig/Beam1"); + query1->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime); + //This query is limited to 100 rows, but currently only one is used + //If all this data is needed and saved properly the limit has to be set: query1->limit(...) + if (query1->execute()) { + auto res = query1->result(); + if (!res.empty()) { + std::bitset bunchConfiguration1(0ULL); + auto row = *res.begin(); + auto vbunchConf1 = row.getArray("value"); + for (auto vb : vbunchConf1) { + if (vb != 0) { + unsigned short slot = (vb - 1) / 10 + 1; + bunchConfiguration1[slot] = true; + } + } + m_fillPayload->setBunchBitsetForBeam1(bunchConfiguration1); + } + } + auto query2 = oms.query("diplogger/dip/acc/LHC/RunControl/CirculatingBunchConfig/Beam2"); + query2->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime); + //This query is limited to 100 rows, but currently only one is used + if (query2->execute()) { + auto res = query2->result(); + if (!res.empty()) { + std::bitset bunchConfiguration2(0ULL); + auto row = *res.begin(); + auto vbunchConf2 = row.getArray("value"); + for (auto vb : vbunchConf2) { + if (vb != 0) { + unsigned short slot = (vb - 1) / 10 + 1; + bunchConfiguration2[slot] = true; + } + } + m_fillPayload->setBunchBitsetForBeam2(bunchConfiguration2); + } + } + + auto query3 = oms.query("diplogger/dip/CMS/LHC/LumiPerBunch"); + query3->filterGT("dip_time", beginFillTime).filterLT("dip_time", endFillTime); + //This query is limited to 100 rows, but currently only one is used + if (query3->execute()) { + auto res = query3->result(); + if (!res.empty()) { + std::vector lumiPerBX; + auto row = *res.begin(); + auto lumiBunchInst = row.getArray("lumi_bunch_inst"); + for (auto lb : lumiBunchInst) { + if (lb != 0.) { + lumiPerBX.push_back(lb); + } + } + m_fillPayload->setLumiPerBX(lumiPerBX); + } + } + } + + bool getCTTPSData(cond::persistency::Session& session, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime) { + //run the fifth query against the CTPPS schema + //Initializing the CMS_CTP_CTPPS_COND schema. + coral::ISchema& CTPPS = session.coralSession().schema("CMS_PPS_SPECT_COND"); + //execute query for CTPPS Data + std::unique_ptr CTPPSDataQuery(CTPPS.newQuery()); + //FROM clause + CTPPSDataQuery->addToTableList(std::string("PPS_LHC_MACHINE_PARAMS")); + //SELECT clause + CTPPSDataQuery->addToOutputList(std::string("DIP_UPDATE_TIME")); + CTPPSDataQuery->addToOutputList(std::string("LHC_STATE")); + CTPPSDataQuery->addToOutputList(std::string("LHC_COMMENT")); + if (m_debug) { + CTPPSDataQuery->addToOutputList(std::string("RUN_NUMBER")); + CTPPSDataQuery->addToOutputList(std::string("LUMI_SECTION")); + } + //WHERE CLAUSE + coral::AttributeList CTPPSDataBindVariables; + CTPPSDataBindVariables.extend(std::string("beginFillTime")); + CTPPSDataBindVariables.extend(std::string("endFillTime")); + CTPPSDataBindVariables[std::string("beginFillTime")].data() = coral::TimeStamp(beginFillTime); + CTPPSDataBindVariables[std::string("endFillTime")].data() = coral::TimeStamp(endFillTime); + std::string conditionStr = std::string("DIP_UPDATE_TIME>= :beginFillTime and DIP_UPDATE_TIME< :endFillTime"); + CTPPSDataQuery->setCondition(conditionStr, CTPPSDataBindVariables); + //ORDER BY clause + CTPPSDataQuery->addToOrderList(std::string("DIP_UPDATE_TIME")); + //define query output + coral::AttributeList CTPPSDataOutput; + CTPPSDataOutput.extend(std::string("DIP_UPDATE_TIME")); + CTPPSDataOutput.extend(std::string("LHC_STATE")); + CTPPSDataOutput.extend(std::string("LHC_COMMENT")); + if (m_debug) { + CTPPSDataOutput.extend(std::string("RUN_NUMBER")); + CTPPSDataOutput.extend(std::string("LUMI_SECTION")); + } + CTPPSDataQuery->defineOutput(CTPPSDataOutput); + //execute the query + coral::ICursor& CTPPSDataCursor = CTPPSDataQuery->execute(); + cond::Time_t dipTime = 0; + std::string lhcState = "", lhcComment = "", ctppsStatus = ""; + + //debug informations + unsigned int lumiSection = 0; + cond::Time_t runNumber = 0; + cond::Time_t savedDipTime = 0; + unsigned int savedLumiSection = 0; + cond::Time_t savedRunNumber = 0; + + bool ret = false; + LumiSectionFilter filter(m_tmpBuffer); + while (CTPPSDataCursor.next()) { + if (m_debug) { + std::ostringstream CTPPS; + CTPPSDataCursor.currentRow().toOutputStream(CTPPS); + } + coral::Attribute const& dipTimeAttribute = CTPPSDataCursor.currentRow()[std::string("DIP_UPDATE_TIME")]; + if (!dipTimeAttribute.isNull()) { + dipTime = cond::time::from_boost(dipTimeAttribute.data().time()); + if (filter.process(dipTime)) { + ret = true; + coral::Attribute const& lhcStateAttribute = CTPPSDataCursor.currentRow()[std::string("LHC_STATE")]; + if (!lhcStateAttribute.isNull()) { + lhcState = lhcStateAttribute.data(); + } + coral::Attribute const& lhcCommentAttribute = CTPPSDataCursor.currentRow()[std::string("LHC_COMMENT")]; + if (!lhcCommentAttribute.isNull()) { + lhcComment = lhcCommentAttribute.data(); + } + + if (m_debug) { + coral::Attribute const& runNumberAttribute = CTPPSDataCursor.currentRow()[std::string("RUN_NUMBER")]; + if (!runNumberAttribute.isNull()) { + runNumber = runNumberAttribute.data(); + } + coral::Attribute const& lumiSectionAttribute = CTPPSDataCursor.currentRow()[std::string("LUMI_SECTION")]; + if (!lumiSectionAttribute.isNull()) { + lumiSection = lumiSectionAttribute.data(); + } + } + + for (auto it = filter.current(); it != m_tmpBuffer.end(); it++) { + // set the current values to all of the payloads of the lumi section samples after the current since + LHCInfoPerFill& payload = *(it->second); + payload.setLhcState(lhcState); + payload.setLhcComment(lhcComment); + payload.setCtppsStatus(ctppsStatus); + + if (m_debug) { + savedDipTime = dipTime; + savedLumiSection = lumiSection; + savedRunNumber = runNumber; + } + } + } + } + } + if (m_debug) { + edm::LogInfo(m_name) << "Last assigned: " + << "DipTime: " << savedDipTime << " " + << "LumiSection: " << savedLumiSection << " " + << "RunNumber: " << savedRunNumber; + } + return ret; + } + + bool getEcalData(cond::persistency::Session& session, + const boost::posix_time::ptime& lowerTime, + const boost::posix_time::ptime& upperTime, + bool update) { + //run the sixth query against the CMS_DCS_ENV_PVSS_COND schema + //Initializing the CMS_DCS_ENV_PVSS_COND schema. + coral::ISchema& ECAL = session.nominalSchema(); + //start the transaction against the fill logging schema + //execute query for ECAL Data + std::unique_ptr ECALDataQuery(ECAL.newQuery()); + //FROM clause + ECALDataQuery->addToTableList(std::string("BEAM_PHASE")); + //SELECT clause + ECALDataQuery->addToOutputList(std::string("CHANGE_DATE")); + ECALDataQuery->addToOutputList(std::string("DIP_value")); + ECALDataQuery->addToOutputList(std::string("element_nr")); + ECALDataQuery->addToOutputList(std::string("VALUE_NUMBER")); + //WHERE CLAUSE + coral::AttributeList ECALDataBindVariables; + ECALDataBindVariables.extend(std::string("lowerTime")); + ECALDataBindVariables.extend(std::string("upperTime")); + ECALDataBindVariables[std::string("lowerTime")].data() = coral::TimeStamp(lowerTime); + ECALDataBindVariables[std::string("upperTime")].data() = coral::TimeStamp(upperTime); + std::string conditionStr = std::string( + "(DIP_value LIKE '%beamPhaseMean%' OR DIP_value LIKE '%cavPhaseMean%') AND CHANGE_DATE >= :lowerTime AND " + "CHANGE_DATE < :upperTime"); + + ECALDataQuery->setCondition(conditionStr, ECALDataBindVariables); + //ORDER BY clause + ECALDataQuery->addToOrderList(std::string("CHANGE_DATE")); + ECALDataQuery->addToOrderList(std::string("DIP_value")); + ECALDataQuery->addToOrderList(std::string("element_nr")); + //define query output + coral::AttributeList ECALDataOutput; + ECALDataOutput.extend(std::string("CHANGE_DATE")); + ECALDataOutput.extend(std::string("DIP_value")); + ECALDataOutput.extend(std::string("element_nr")); + ECALDataOutput.extend(std::string("VALUE_NUMBER")); + //ECALDataQuery->limitReturnedRows( 14256 ); //3564 entries per vector. + ECALDataQuery->defineOutput(ECALDataOutput); + //execute the query + coral::ICursor& ECALDataCursor = ECALDataQuery->execute(); + cond::Time_t changeTime = 0; + cond::Time_t firstTime = 0; + std::string dipVal = ""; + unsigned int elementNr = 0; + float value = 0.; + std::set initializedVectors; + LumiSectionFilter filter(m_tmpBuffer); + bool ret = false; + if (m_prevPayload.get()) { + for (auto& lumiSlot : m_tmpBuffer) { + lumiSlot.second->setBeam1VC(m_prevPayload->beam1VC()); + lumiSlot.second->setBeam2VC(m_prevPayload->beam2VC()); + lumiSlot.second->setBeam1RF(m_prevPayload->beam1RF()); + lumiSlot.second->setBeam2RF(m_prevPayload->beam2RF()); + } + } + std::map iovMap; + if (m_tmpBuffer.empty()) { + return ret; + } + cond::Time_t lowerLumi = m_tmpBuffer.front().first; + while (ECALDataCursor.next()) { + if (m_debug) { + std::ostringstream ECAL; + ECALDataCursor.currentRow().toOutputStream(ECAL); + } + coral::Attribute const& changeDateAttribute = ECALDataCursor.currentRow()[std::string("CHANGE_DATE")]; + if (!changeDateAttribute.isNull()) { + ret = true; + boost::posix_time::ptime chTime = changeDateAttribute.data().time(); + // move the first IOV found to the start of the fill interval selected + if (changeTime == 0) { + firstTime = cond::time::from_boost(chTime); + } + changeTime = cond::time::from_boost(chTime); + cond::Time_t iovTime = changeTime; + if (!update and changeTime == firstTime) + iovTime = lowerLumi; + coral::Attribute const& dipValAttribute = ECALDataCursor.currentRow()[std::string("DIP_value")]; + coral::Attribute const& valueNumberAttribute = ECALDataCursor.currentRow()[std::string("VALUE_NUMBER")]; + coral::Attribute const& elementNrAttribute = ECALDataCursor.currentRow()[std::string("element_nr")]; + if (!dipValAttribute.isNull() and !valueNumberAttribute.isNull()) { + dipVal = dipValAttribute.data(); + elementNr = elementNrAttribute.data(); + value = valueNumberAttribute.data(); + if (std::isnan(value)) + value = 0.; + if (filter.process(iovTime)) { + iovMap.insert(std::make_pair(changeTime, filter.current()->first)); + for (auto it = filter.current(); it != m_tmpBuffer.end(); it++) { + LHCInfoPerFill& payload = *(it->second); + theLHCInfoPerFillImpl::setElementData(it->first, dipVal, elementNr, value, payload, initializedVectors); + } + } + //} + } + } + } + if (m_debug) { + for (auto& im : iovMap) { + edm::LogInfo(m_name) << "Found iov=" << im.first << " (" << cond::time::to_boost(im.first) << " ) moved to " + << im.second << " ( " << cond::time::to_boost(im.second) << " )"; + } + } + return ret; + } + +private: + bool m_debug; + // starting date for sampling + boost::posix_time::ptime m_startTime; + boost::posix_time::ptime m_endTime; + // sampling interval in seconds + unsigned int m_samplingInterval; + bool m_endFillMode = true; + std::string m_name; + //for reading from relational database source + std::string m_connectionString, m_ecalConnectionString; + std::string m_dipSchema, m_authpath; + std::string m_omsBaseUrl; + std::unique_ptr m_fillPayload; + std::shared_ptr m_prevPayload; + std::vector>> m_tmpBuffer; + bool m_lastPayloadEmpty = false; +}; diff --git a/CondTools/RunInfo/plugins/LHCInfoPerLSPopConAnalyzer.cc b/CondTools/RunInfo/plugins/LHCInfoPerLSPopConAnalyzer.cc new file mode 100644 index 0000000000000..5889c23407999 --- /dev/null +++ b/CondTools/RunInfo/plugins/LHCInfoPerLSPopConAnalyzer.cc @@ -0,0 +1,556 @@ +#include "CondCore/CondDB/interface/ConnectionPool.h" +#include "CondCore/CondDB/interface/Types.h" +#include "CondCore/PopCon/interface/PopConAnalyzer.h" +#include "CondCore/PopCon/interface/PopConSourceHandler.h" +#include "CondFormats/Common/interface/TimeConversions.h" +#include "CondFormats/RunInfo/interface/LHCInfoPerLS.h" +#include "CondTools/RunInfo/interface/OMSAccess.h" +#include "CondTools/RunInfo/interface/OMSAccess.h" +#include "CoralBase/Attribute.h" +#include "CoralBase/AttributeList.h" +#include "CoralBase/AttributeSpecification.h" +#include "CoralBase/TimeStamp.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetfwd.h" +#include "RelationalAccess/ICursor.h" +#include "RelationalAccess/IQuery.h" +#include "RelationalAccess/ISchema.h" +#include "RelationalAccess/ISessionProxy.h" +#include +#include +#include +#include +#include +#include +#include +#include + +class LHCInfoPerLSPopConSourceHandler; + +typedef popcon::PopConAnalyzer LHCInfoPerLSPopConAnalyzer; +//define this as a plug-in +DEFINE_FWK_MODULE(LHCInfoPerLSPopConAnalyzer); + +namespace theLHCInfoPerLSImpl { + + struct LumiSectionFilter { + LumiSectionFilter(const std::vector>>& samples) + : currLow(samples.begin()), currUp(samples.begin()), end(samples.end()) { + currUp++; + } + + void reset(const std::vector>>& samples) { + currLow = samples.begin(); + currUp = samples.begin(); + currUp++; + end = samples.end(); + currentDipTime = 0; + } + + bool process(cond::Time_t dipTime) { + if (currLow == end) + return false; + bool search = false; + if (currentDipTime == 0) { + search = true; + } else { + if (dipTime == currentDipTime) + return true; + else { + cond::Time_t upper = cond::time::MAX_VAL; + if (currUp != end) + upper = currUp->first; + if (dipTime < upper && currentDipTime >= currLow->first) + return false; + else { + search = true; + } + } + } + if (search) { + while (currUp != end and currUp->first < dipTime) { + currLow++; + currUp++; + } + currentDipTime = dipTime; + return currLow != end; + } + return false; + } + + cond::Time_t currentSince() { return currLow->first; } + LHCInfoPerLS& currentPayload() { return *currLow->second; } + + std::vector>>::const_iterator current() { return currLow; } + std::vector>>::const_iterator currLow; + std::vector>>::const_iterator currUp; + std::vector>>::const_iterator end; + cond::Time_t currentDipTime = 0; + }; + + bool comparePayloads(const LHCInfoPerLS& rhs, const LHCInfoPerLS& lhs) { + if (rhs.fillNumber() != lhs.fillNumber()) + return false; + if (rhs.runNumber() != lhs.runNumber()) + return false; + if (rhs.crossingAngleX() != lhs.crossingAngleX()) + return false; + if (rhs.crossingAngleY() != lhs.crossingAngleY()) + return false; + if (rhs.betaStarX() != lhs.betaStarX()) + return false; + if (rhs.betaStarY() != lhs.betaStarY()) + return false; + return true; + } + + size_t transferPayloads(const std::vector>>& buffer, + std::map>& iovsToTransfer, + std::shared_ptr& prevPayload) { + size_t niovs = 0; + std::stringstream condIovs; + for (auto& iov : buffer) { + bool add = false; + auto payload = iov.second; + cond::Time_t since = iov.first; + if (iovsToTransfer.empty()) { + add = true; + } else { + LHCInfoPerLS& lastAdded = *iovsToTransfer.rbegin()->second; + if (!comparePayloads(lastAdded, *payload)) { + add = true; + } + } + if (add) { + niovs++; + condIovs << since << " "; + iovsToTransfer.insert(std::make_pair(since, payload)); + prevPayload = iov.second; + } + } + edm::LogInfo("transferPayloads") << "TRANSFERED COND IOVS: " << condIovs.str(); + return niovs; + } + +} // namespace theLHCInfoPerLSImpl +class LHCInfoPerLSPopConSourceHandler : public popcon::PopConSourceHandler { +public: + LHCInfoPerLSPopConSourceHandler(edm::ParameterSet const& pset) + : m_debug(pset.getUntrackedParameter("debug", false)), + m_startTime(), + m_endTime(), + m_samplingInterval((unsigned int)pset.getUntrackedParameter("samplingInterval", 300)), + m_endFillMode(pset.getUntrackedParameter("endFill", true)), + m_name(pset.getUntrackedParameter("name", "LHCInfoPerLSPopConSourceHandler")), + m_connectionString(pset.getUntrackedParameter("connectionString", "")), + m_dipSchema(pset.getUntrackedParameter("DIPSchema", "")), + m_authpath(pset.getUntrackedParameter("authenticationPath", "")), + m_omsBaseUrl(pset.getUntrackedParameter("omsBaseUrl", "")), + m_fillPayload(), + m_prevPayload(), + m_tmpBuffer() { + if (!pset.getUntrackedParameter("startTime").empty()) { + m_startTime = boost::posix_time::time_from_string(pset.getUntrackedParameter("startTime")); + } + boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + m_endTime = now; + if (!pset.getUntrackedParameter("endTime").empty()) { + m_endTime = boost::posix_time::time_from_string(pset.getUntrackedParameter("endTime")); + if (m_endTime > now) + m_endTime = now; + } + } + //L1: try with different m_dipSchema + //L2: try with different m_name + ~LHCInfoPerLSPopConSourceHandler() override = default; + void getNewObjects() override { + //if a new tag is created, transfer fake fill from 1 to the first fill for the first time + if (tagInfo().size == 0) { + edm::LogInfo(m_name) << "New tag " << tagInfo().name << "; from " << m_name << "::getNewObjects"; + } else { + //check what is already inside the database + edm::LogInfo(m_name) << "got info for tag " << tagInfo().name << ": size " << tagInfo().size + << ", last object valid since " << tagInfo().lastInterval.since << " ( " + << boost::posix_time::to_iso_extended_string( + cond::time::to_boost(tagInfo().lastInterval.since)) + << " ); from " << m_name << "::getNewObjects"; + } + + cond::Time_t lastSince = tagInfo().lastInterval.since; + if (tagInfo().isEmpty()) { + // for a new or empty tag, an empty payload should be added on top with since=1 + addEmptyPayload(1); + lastSince = 1; + } else { + edm::LogInfo(m_name) << "The last Iov in tag " << tagInfo().name << " valid since " << lastSince << "from " + << m_name << "::getNewObjects"; + } + + boost::posix_time::ptime executionTime = boost::posix_time::second_clock::local_time(); + cond::Time_t targetSince = 0; + cond::Time_t executionTimeIov = cond::time::from_boost(executionTime); + if (!m_startTime.is_not_a_date_time()) { + targetSince = cond::time::from_boost(m_startTime); + } + if (lastSince > targetSince) + targetSince = lastSince; + + edm::LogInfo(m_name) << "Starting sampling at " + << boost::posix_time::to_simple_string(cond::time::to_boost(targetSince)); + + //retrieve the data from the relational database source + cond::persistency::ConnectionPool connection; + //configure the connection + if (m_debug) { + connection.setMessageVerbosity(coral::Debug); + } else { + connection.setMessageVerbosity(coral::Error); + } + connection.setAuthenticationPath(m_authpath); + connection.configure(); + //create the sessions + cond::persistency::Session session = connection.createSession(m_connectionString, false); + // fetch last payload when available + if (!tagInfo().lastInterval.payloadId.empty()) { + cond::persistency::Session session3 = dbSession(); + session3.transaction().start(true); + m_prevPayload = session3.fetchPayload(tagInfo().lastInterval.payloadId); + session3.transaction().commit(); + + // find startFillTime and endFillTime of the most recent fill already saved in the tag + if (m_prevPayload->fillNumber() != 0) { + cond::OMSService oms; + oms.connect(m_omsBaseUrl); + auto query = oms.query("fills"); + query->addOutputVar("end_time"); + query->filterEQ("fill_number", m_prevPayload->fillNumber()); + bool foundFill = query->execute(); + if (foundFill) { + auto result = query->result(); + + if (!result.empty()) { + auto endFillTime = (*result.begin()).get("end_time"); + m_prevEndFillTime = cond::time::from_boost(endFillTime); + auto startFillTime = (*result.begin()).get("start_time"); + m_prevStartFillTime = cond::time::from_boost(startFillTime); + } else { + foundFill = false; + } + } + if (!foundFill) { + edm::LogError(m_name) << "Could not find end time of fill #" << m_prevPayload->fillNumber(); + } + } else { + m_prevEndFillTime = 0; + m_prevStartFillTime = 0; + } + } + + while (true) { + if (targetSince >= executionTimeIov) { + edm::LogInfo(m_name) << "Sampling ended at the time " + << boost::posix_time::to_simple_string(cond::time::to_boost(executionTimeIov)); + break; + } + boost::posix_time::ptime targetTime = cond::time::to_boost(targetSince); + boost::posix_time::ptime startSampleTime; + boost::posix_time::ptime endSampleTime; + + cond::OMSService oms; + oms.connect(m_omsBaseUrl); + auto query = oms.query("fills"); + + if (!m_endFillMode and m_prevPayload->fillNumber() and m_prevEndFillTime == 0ULL) { + // continue processing unfinished fill with some payloads already in the tag + edm::LogInfo(m_name) << "Searching started fill #" << m_prevPayload->fillNumber(); + query->filterEQ("fill_number", m_prevPayload->fillNumber()); + bool foundFill = query->execute(); + if (foundFill) + foundFill = makeFillPayload(m_fillPayload, query->result()); + if (!foundFill) { + edm::LogError(m_name) << "Could not find fill #" << m_prevPayload->fillNumber(); + break; + } + startSampleTime = cond::time::to_boost(lastSince); + } else { + edm::LogInfo(m_name) << "Searching new fill after " << boost::posix_time::to_simple_string(targetTime); + query->filterNotNull("start_stable_beam").filterNotNull("fill_number"); + if (targetTime > cond::time::to_boost(m_prevStartFillTime)) { + query->filterGE("start_time", targetTime); + } else { + query->filterGT("start_time", targetTime); + } + + query->filterLT("start_time", m_endTime); + if (m_endFillMode) + query->filterNotNull("end_time"); + bool foundFill = query->execute(); + if (foundFill) + foundFill = makeFillPayload(m_fillPayload, query->result()); + if (!foundFill) { + edm::LogInfo(m_name) << "No fill found - END of job."; + break; + } + startSampleTime = cond::time::to_boost(m_startFillTime); + } + + unsigned short lhcFill = m_fillPayload->fillNumber(); + bool ongoingFill = m_endFillTime == 0ULL; + if (ongoingFill) { + edm::LogInfo(m_name) << "Found ongoing fill " << lhcFill << " created at " + << cond::time::to_boost(m_startFillTime); + endSampleTime = executionTime; + targetSince = executionTimeIov; + } else { + edm::LogInfo(m_name) << "Found fill " << lhcFill << " created at " << cond::time::to_boost(m_startFillTime) + << " ending at " << cond::time::to_boost(m_endFillTime); + endSampleTime = cond::time::to_boost(m_endFillTime); + targetSince = m_endFillTime; + } + + if (m_endFillMode || ongoingFill) { + getLumiData(oms, lhcFill, startSampleTime, endSampleTime); + + if (!m_tmpBuffer.empty()) { + boost::posix_time::ptime flumiStart = cond::time::to_boost(m_tmpBuffer.front().first); + boost::posix_time::ptime flumiStop = cond::time::to_boost(m_tmpBuffer.back().first); + edm::LogInfo(m_name) << "First buffered lumi starts at " << flumiStart << " last lumi starts at " + << flumiStop; + session.transaction().start(true); + getCTTPSData(session, startSampleTime, endSampleTime); + session.transaction().commit(); + } + } + + size_t niovs = theLHCInfoPerLSImpl::transferPayloads(m_tmpBuffer, m_iovs, m_prevPayload); + edm::LogInfo(m_name) << "Added " << niovs << " iovs within the Fill time"; + if (niovs) { + m_prevEndFillTime = m_endFillTime; + m_prevStartFillTime = m_startFillTime; + } + m_tmpBuffer.clear(); + if (m_prevPayload->fillNumber() and !ongoingFill) + addEmptyPayload(m_endFillTime); + } + } + std::string id() const override { return m_name; } + + static constexpr unsigned int kLumisectionsQueryLimit = 4000; + +private: + void addEmptyPayload(cond::Time_t iov) { + bool add = false; + if (m_iovs.empty()) { + if (!m_lastPayloadEmpty) + add = true; + } else { + auto lastAdded = m_iovs.rbegin()->second; + if (lastAdded->fillNumber() != 0) { + add = true; + } + } + if (add) { + auto newPayload = std::make_shared(); + m_iovs.insert(std::make_pair(iov, newPayload)); + m_prevPayload = newPayload; + m_prevEndFillTime = 0; + m_prevStartFillTime = 0; + edm::LogInfo(m_name) << "Added empty payload with IOV" << iov << " ( " + << boost::posix_time::to_iso_extended_string(cond::time::to_boost(iov)) << " )"; + } + } + + bool makeFillPayload(std::unique_ptr& targetPayload, const cond::OMSServiceResult& queryResult) { + bool ret = false; + if (!queryResult.empty()) { + auto row = *queryResult.begin(); + auto currentFill = row.get("fill_number"); + m_startFillTime = cond::time::from_boost(row.get("start_time")); + m_endFillTime = cond::time::from_boost(row.get("end_time")); + targetPayload = std::make_unique(); + targetPayload->setFillNumber(currentFill); + ret = true; + } + return ret; + } + + void addPayloadToBuffer(cond::OMSServiceResultRef& row) { + auto lumiTime = row.get("start_time"); + LHCInfoPerLS* thisLumiSectionInfo = new LHCInfoPerLS(*m_fillPayload); + m_tmpBuffer.emplace_back(std::make_pair(cond::time::from_boost(lumiTime), thisLumiSectionInfo)); + } + + size_t bufferAllLS(const cond::OMSServiceResult& queryResult) { + for (auto r : queryResult) { + addPayloadToBuffer(r); + } + return queryResult.size(); + } + + size_t bufferFirstStableBeamLS(const cond::OMSServiceResult& queryResult) { + for (auto r : queryResult) { + if (r.get("beams_stable") == "true") { + addPayloadToBuffer(r); + edm::LogInfo(m_name) << "Buffered first lumisection of stable beam: LS: " + << r.get("lumisection_number") + << " run: " << r.get("run_number"); + return 1; + } + } + return 0; + } + + size_t getLumiData(const cond::OMSService& oms, + unsigned short fillId, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime) { + auto query = oms.query("lumisections"); + query->addOutputVars({"start_time", "run_number", "beams_stable", "lumisection_number"}); + query->filterEQ("fill_number", fillId); + query->filterGT("start_time", beginFillTime).filterLT("start_time", endFillTime); + query->limit(kLumisectionsQueryLimit); + size_t nlumi = 0; + if (query->execute()) { + auto queryResult = query->result(); + if (m_endFillMode) { + nlumi = bufferAllLS(queryResult); + } else if (!queryResult.empty()) { + auto newestPayload = queryResult.back(); + if (newestPayload.get("beams_stable") == "true") { + addPayloadToBuffer(newestPayload); + nlumi = 1; + edm::LogInfo(m_name) << "Buffered most recent lumisection:" + << " LS: " << newestPayload.get("lumisection_number") + << " run: " << newestPayload.get("run_number"); + } + } + edm::LogInfo(m_name) << "Found " << queryResult.size() << " lumisections during the fill " << fillId; + } else { + edm::LogInfo(m_name) << "OMS query for lumisections of fill " << fillId << "failed, status:" << query->status(); + } + return nlumi; + } + bool getCTTPSData(cond::persistency::Session& session, + const boost::posix_time::ptime& beginFillTime, + const boost::posix_time::ptime& endFillTime) { + //run the fifth query against the CTPPS schema + //Initializing the CMS_CTP_CTPPS_COND schema. + coral::ISchema& CTPPS = session.coralSession().schema("CMS_PPS_SPECT_COND"); + //execute query for CTPPS Data + std::unique_ptr CTPPSDataQuery(CTPPS.newQuery()); + //FROM clause + CTPPSDataQuery->addToTableList(std::string("PPS_LHC_MACHINE_PARAMS")); + //SELECT clause + CTPPSDataQuery->addToOutputList(std::string("DIP_UPDATE_TIME")); + CTPPSDataQuery->addToOutputList(std::string("LUMI_SECTION")); + CTPPSDataQuery->addToOutputList(std::string("RUN_NUMBER")); + CTPPSDataQuery->addToOutputList(std::string("XING_ANGLE_P5_X_URAD")); + CTPPSDataQuery->addToOutputList(std::string("XING_ANGLE_P5_Y_URAD")); + CTPPSDataQuery->addToOutputList(std::string("BETA_STAR_P5_X_M")); + CTPPSDataQuery->addToOutputList(std::string("BETA_STAR_P5_Y_M")); + //WHERE CLAUSE + coral::AttributeList CTPPSDataBindVariables; + CTPPSDataBindVariables.extend(std::string("beginFillTime")); + CTPPSDataBindVariables.extend(std::string("endFillTime")); + CTPPSDataBindVariables[std::string("beginFillTime")].data() = coral::TimeStamp(beginFillTime); + CTPPSDataBindVariables[std::string("endFillTime")].data() = coral::TimeStamp(endFillTime); + std::string conditionStr = std::string("DIP_UPDATE_TIME>= :beginFillTime and DIP_UPDATE_TIME< :endFillTime"); + CTPPSDataQuery->setCondition(conditionStr, CTPPSDataBindVariables); + //ORDER BY clause + CTPPSDataQuery->addToOrderList(std::string("DIP_UPDATE_TIME")); + //define query output + coral::AttributeList CTPPSDataOutput; + CTPPSDataOutput.extend(std::string("DIP_UPDATE_TIME")); + CTPPSDataOutput.extend(std::string("LUMI_SECTION")); + CTPPSDataOutput.extend(std::string("RUN_NUMBER")); + CTPPSDataOutput.extend(std::string("XING_ANGLE_P5_X_URAD")); + CTPPSDataOutput.extend(std::string("XING_ANGLE_P5_Y_URAD")); + CTPPSDataOutput.extend(std::string("BETA_STAR_P5_X_M")); + CTPPSDataOutput.extend(std::string("BETA_STAR_P5_Y_M")); + CTPPSDataQuery->defineOutput(CTPPSDataOutput); + //execute the query + coral::ICursor& CTPPSDataCursor = CTPPSDataQuery->execute(); + cond::Time_t dipTime = 0; + unsigned int lumiSection = 0; + cond::Time_t runNumber = 0; + float crossingAngleX = 0., betaStarX = 0.; + float crossingAngleY = 0., betaStarY = 0.; + + bool ret = false; + theLHCInfoPerLSImpl::LumiSectionFilter filter(m_tmpBuffer); + while (CTPPSDataCursor.next()) { + if (m_debug) { + std::ostringstream CTPPS; + CTPPSDataCursor.currentRow().toOutputStream(CTPPS); + } + coral::Attribute const& dipTimeAttribute = CTPPSDataCursor.currentRow()[std::string("DIP_UPDATE_TIME")]; + if (!dipTimeAttribute.isNull()) { + dipTime = cond::time::from_boost(dipTimeAttribute.data().time()); + if (filter.process(dipTime)) { + ret = true; + coral::Attribute const& lumiSectionAttribute = CTPPSDataCursor.currentRow()[std::string("LUMI_SECTION")]; + if (!lumiSectionAttribute.isNull()) { + lumiSection = lumiSectionAttribute.data(); + } + coral::Attribute const& runNumberAttribute = CTPPSDataCursor.currentRow()[std::string("RUN_NUMBER")]; + if (!runNumberAttribute.isNull()) { + runNumber = runNumberAttribute.data(); + } + coral::Attribute const& crossingAngleXAttribute = + CTPPSDataCursor.currentRow()[std::string("XING_ANGLE_P5_X_URAD")]; + if (!crossingAngleXAttribute.isNull()) { + crossingAngleX = crossingAngleXAttribute.data(); + } + coral::Attribute const& crossingAngleYAttribute = + CTPPSDataCursor.currentRow()[std::string("XING_ANGLE_P5_Y_URAD")]; + if (!crossingAngleYAttribute.isNull()) { + crossingAngleY = crossingAngleYAttribute.data(); + } + coral::Attribute const& betaStarXAttribute = CTPPSDataCursor.currentRow()[std::string("BETA_STAR_P5_X_M")]; + if (!betaStarXAttribute.isNull()) { + betaStarX = betaStarXAttribute.data(); + } + coral::Attribute const& betaStarYAttribute = CTPPSDataCursor.currentRow()[std::string("BETA_STAR_P5_Y_M")]; + if (!betaStarYAttribute.isNull()) { + betaStarY = betaStarYAttribute.data(); + } + for (auto it = filter.current(); it != m_tmpBuffer.end(); it++) { + // set the current values to all of the payloads of the lumi section samples after the current since + LHCInfoPerLS& payload = *(it->second); + payload.setCrossingAngleX(crossingAngleX); + payload.setCrossingAngleY(crossingAngleY); + payload.setBetaStarX(betaStarX); + payload.setBetaStarY(betaStarY); + payload.setLumiSection(lumiSection); + payload.setRunNumber(runNumber); + } + } + } + } + return ret; + } + +private: + bool m_debug; + // starting date for sampling + boost::posix_time::ptime m_startTime; + boost::posix_time::ptime m_endTime; + // sampling interval in seconds + unsigned int m_samplingInterval; + bool m_endFillMode = true; + std::string m_name; + //for reading from relational database source + std::string m_connectionString, m_ecalConnectionString; + std::string m_dipSchema, m_authpath; + std::string m_omsBaseUrl; + std::unique_ptr m_fillPayload; + std::shared_ptr m_prevPayload; + cond::Time_t m_startFillTime; + cond::Time_t m_endFillTime; + cond::Time_t m_prevEndFillTime; + cond::Time_t m_prevStartFillTime; + std::vector>> m_tmpBuffer; + bool m_lastPayloadEmpty = false; +}; diff --git a/CondTools/RunInfo/plugins/LumiSectionFilter.h b/CondTools/RunInfo/plugins/LumiSectionFilter.h new file mode 100644 index 0000000000000..6b40b3727d6d5 --- /dev/null +++ b/CondTools/RunInfo/plugins/LumiSectionFilter.h @@ -0,0 +1,59 @@ +#include "CondFormats/Common/interface/TimeConversions.h" +#include +#include + +template +struct LumiSectionFilter { + LumiSectionFilter(const std::vector>>& samples) + : currLow(samples.begin()), currUp(samples.begin()), end(samples.end()) { + currUp++; + } + + void reset(const std::vector>>& samples) { + currLow = samples.begin(); + currUp = samples.begin(); + currUp++; + end = samples.end(); + currentDipTime = 0; + } + + bool process(cond::Time_t dipTime) { + if (currLow == end) + return false; + bool search = false; + if (currentDipTime == 0) { + search = true; + } else { + if (dipTime == currentDipTime) + return true; + else { + cond::Time_t upper = cond::time::MAX_VAL; + if (currUp != end) + upper = currUp->first; + if (dipTime < upper && currentDipTime >= currLow->first) + return false; + else { + search = true; + } + } + } + if (search) { + while (currUp != end and currUp->first < dipTime) { + currLow++; + currUp++; + } + currentDipTime = dipTime; + return currLow != end; + } + return false; + } + + cond::Time_t currentSince() { return currLow->first; } + T& currentPayload() { return *currLow->second; } + + typename std::vector>>::const_iterator current() { return currLow; } + typename std::vector>>::const_iterator currLow; + typename std::vector>>::const_iterator currUp; + typename std::vector>>::const_iterator end; + cond::Time_t currentDipTime = 0; +}; diff --git a/CondTools/RunInfo/python/LHCInfoPerFillPopConAnalyzer.py b/CondTools/RunInfo/python/LHCInfoPerFillPopConAnalyzer.py new file mode 100644 index 0000000000000..e4b6dd762df21 --- /dev/null +++ b/CondTools/RunInfo/python/LHCInfoPerFillPopConAnalyzer.py @@ -0,0 +1,114 @@ +import socket +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing +process = cms.Process("LHCInfoPerFillPopulator") +from CondCore.CondDB.CondDB_cfi import * +#process.load("CondCore.DBCommon.CondDBCommon_cfi") +#process.CondDBCommon.connect = 'sqlite_file:lhcinfoperls_pop_test.db' +#process.CondDBCommon.DBParameters.authenticationPath = '.' +#process.CondDBCommon.DBParameters.messageLevel=cms.untracked.int32(1) + +sourceConnection = 'oracle://cms_omds_adg/CMS_RUNINFO_R' +if socket.getfqdn().find('.cms') != -1: + sourceConnection = 'oracle://cms_omds_lb/CMS_RUNINFO_R' + +options = VarParsing.VarParsing() +options.register( 'mode' + , None # Required parameter + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "The mode the fills are going to be process and the data gathered. Accepted values: duringFill endFill" + ) +options.register( 'destinationConnection' + , 'sqlite_file:lhcinfo_pop_test.db' #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "Connection string to the DB where payloads will be possibly written." + ) +options.register( 'targetConnection' + , '' #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , """Connection string to the target DB: + if not empty (default), this provides the latest IOV and payloads to compare; + it is the DB where payloads should be finally uploaded.""" + ) +options.register( 'tag' + , 'LHCInfoPerFill_PopCon_test' + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "Tag written in destinationConnection and finally appended in targetConnection." + ) +options.register( 'messageLevel' + , 0 #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.int + , "Message level; default to 0" + ) +options.register( 'startTime' + , '2021-09-10 03:10:18.000' + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , """Date and time of the start of processing: + processes only fills starting at startTime or later""" + ) +options.register( 'endTime' + , '' + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , """Date and time of the start of processing: + processes only fills starting before endTime; + default to empty string which sets no restriction""" + ) +options.parseArguments() +if options.mode is None: + raise ValueError("mode argument not provided. Supported modes are: duringFill endFill") +if options.mode not in ("duringFill", "endFill"): + raise ValueError("Wrong mode argument. Supported modes are: duringFill endFill") + +CondDBConnection = CondDB.clone( connect = cms.string( options.destinationConnection ) ) +CondDBConnection.DBParameters.messageLevel = cms.untracked.int32( options.messageLevel ) + +process.MessageLogger = cms.Service("MessageLogger", + cout = cms.untracked.PSet(threshold = cms.untracked.string('INFO')), + destinations = cms.untracked.vstring('cout') + ) + +process.source = cms.Source("EmptyIOVSource", + lastValue = cms.uint64(1), + timetype = cms.string('runnumber'), + firstValue = cms.uint64(1), + interval = cms.uint64(1) + ) + +process.PoolDBOutputService = cms.Service("PoolDBOutputService", + CondDBConnection, + timetype = cms.untracked.string('timestamp'), + toPut = cms.VPSet(cms.PSet(record = cms.string('LHCInfoPerFillRcd'), + tag = cms.string( options.tag ) + ) + ) + ) + +process.Test1 = cms.EDAnalyzer("LHCInfoPerFillPopConAnalyzer", + SinceAppendMode = cms.bool(True), + record = cms.string('LHCInfoPerFillRcd'), + name = cms.untracked.string('LHCInfo'), + Source = cms.PSet(fill = cms.untracked.uint32(6417), + startTime = cms.untracked.string(options.startTime), + endTime = cms.untracked.string(options.endTime), + samplingInterval = cms.untracked.uint32( 600 ), + endFill = cms.untracked.bool(True if options.mode == "endFill" else False), + name = cms.untracked.string("LHCInfoPerFillPopConSourceHandler"), + connectionString = cms.untracked.string("oracle://cms_orcon_adg/CMS_RUNTIME_LOGGER"), + ecalConnectionString = cms.untracked.string("oracle://cms_orcon_adg/CMS_DCS_ENV_PVSS_COND"), + DIPSchema = cms.untracked.string("CMS_BEAM_COND"), + omsBaseUrl = cms.untracked.string("http://vocms0184.cern.ch/agg/api/v1"), + authenticationPath = cms.untracked.string(""), + debug=cms.untracked.bool(False) + ), + loggingOn = cms.untracked.bool(True), + IsDestDbCheckedInQueryLog = cms.untracked.bool(False) + ) + +process.p = cms.Path(process.Test1) diff --git a/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzer.py b/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzer.py new file mode 100644 index 0000000000000..522aedae4ec1c --- /dev/null +++ b/CondTools/RunInfo/python/LHCInfoPerLSPopConAnalyzer.py @@ -0,0 +1,113 @@ +import socket +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing +process = cms.Process("LHCInfoPerLSPopulator") +from CondCore.CondDB.CondDB_cfi import * +#process.load("CondCore.DBCommon.CondDBCommon_cfi") +#process.CondDBCommon.connect = 'sqlite_file:lhcinfoperls_pop_test.db' +#process.CondDBCommon.DBParameters.authenticationPath = '.' +#process.CondDBCommon.DBParameters.messageLevel=cms.untracked.int32(1) + +sourceConnection = 'oracle://cms_omds_adg/CMS_RUNINFO_R' +if socket.getfqdn().find('.cms') != -1: + sourceConnection = 'oracle://cms_omds_lb/CMS_RUNINFO_R' + +options = VarParsing.VarParsing() +options.register( 'mode' + , None # Required parameter + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "The mode the fills are going to be process and the data gathered. Accepted values: duringFill endFill" + ) +options.register( 'destinationConnection' + , 'sqlite_file:lhcinfo_pop_test.db' #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "Connection string to the DB where payloads will be possibly written." + ) +options.register( 'targetConnection' + , '' #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , """Connection string to the target DB: + if not empty (default), this provides the latest IOV and payloads to compare; + it is the DB where payloads should be finally uploaded.""" + ) +options.register( 'tag' + , 'LHCInfoPerLS_PopCon_test' + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , "Tag written in destinationConnection and finally appended in targetConnection." + ) +options.register( 'messageLevel' + , 0 #default value + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.int + , "Message level; default to 0" + ) +options.register( 'startTime' + , '2021-09-10 03:10:18.000' + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , """Date and time of the start of processing: + processes only fills starting at startTime or later""" + ) +options.register( 'endTime' + , '' + , VarParsing.VarParsing.multiplicity.singleton + , VarParsing.VarParsing.varType.string + , """Date and time of the start of processing: + processes only fills starting before endTime; + default to empty string which sets no restriction""" + ) +options.parseArguments() +if options.mode is None: + raise ValueError("mode argument not provided. Supported modes are: duringFill endFill") +if options.mode not in ("duringFill", "endFill"): + raise ValueError("Wrong mode argument. Supported modes are: duringFill endFill") + +CondDBConnection = CondDB.clone( connect = cms.string( options.destinationConnection ) ) +CondDBConnection.DBParameters.messageLevel = cms.untracked.int32( options.messageLevel ) + +process.MessageLogger = cms.Service("MessageLogger", + cout = cms.untracked.PSet(threshold = cms.untracked.string('INFO')), + destinations = cms.untracked.vstring('cout') + ) + +process.source = cms.Source("EmptyIOVSource", + lastValue = cms.uint64(1), + timetype = cms.string('runnumber'), + firstValue = cms.uint64(1), + interval = cms.uint64(1) + ) + +process.PoolDBOutputService = cms.Service("PoolDBOutputService", + CondDBConnection, + timetype = cms.untracked.string('timestamp'), + toPut = cms.VPSet(cms.PSet(record = cms.string('LHCInfoPerLSRcd'), + tag = cms.string( options.tag ) + ) + ) + ) + +process.Test1 = cms.EDAnalyzer("LHCInfoPerLSPopConAnalyzer", + SinceAppendMode = cms.bool(True), + record = cms.string('LHCInfoPerLSRcd'), + name = cms.untracked.string('LHCInfo'), + Source = cms.PSet(fill = cms.untracked.uint32(6417), + startTime = cms.untracked.string(options.startTime), + endTime = cms.untracked.string(options.endTime), + samplingInterval = cms.untracked.uint32( 600 ), + endFill = cms.untracked.bool(True if options.mode == "endFill" else False), + name = cms.untracked.string("LHCInfoPerLSPopConSourceHandler"), + connectionString = cms.untracked.string("oracle://cms_orcon_adg/CMS_RUNTIME_LOGGER"), + DIPSchema = cms.untracked.string("CMS_BEAM_COND"), + omsBaseUrl = cms.untracked.string("http://vocms0184.cern.ch/agg/api/v1"), + authenticationPath = cms.untracked.string(""), + debug=cms.untracked.bool(False) + ), + loggingOn = cms.untracked.bool(True), + IsDestDbCheckedInQueryLog = cms.untracked.bool(False) + ) + +process.p = cms.Path(process.Test1) diff --git a/CondTools/RunInfo/src/OMSAccess.cc b/CondTools/RunInfo/src/OMSAccess.cc index 4a699c955428e..cc033e42ed6b8 100644 --- a/CondTools/RunInfo/src/OMSAccess.cc +++ b/CondTools/RunInfo/src/OMSAccess.cc @@ -33,6 +33,16 @@ namespace cond { OMSServiceResultIterator OMSServiceResult::end() const { return OMSServiceResultIterator(m_data->end()); } + OMSServiceResultRef OMSServiceResult::front() const { + auto& attributeList = m_data->front().second.get_child("attributes"); + return OMSServiceResultRef(&attributeList); + } + + OMSServiceResultRef OMSServiceResult::back() const { + auto& attributeList = m_data->back().second.get_child("attributes"); + return OMSServiceResultRef(&attributeList); + } + size_t OMSServiceResult::parseData(const std::string& data) { m_data = nullptr; std::stringstream sout; diff --git a/CondTools/RunInfo/test/BuildFile.xml b/CondTools/RunInfo/test/BuildFile.xml index 3af9ead5f1a2d..198a3fb00b7f8 100644 --- a/CondTools/RunInfo/test/BuildFile.xml +++ b/CondTools/RunInfo/test/BuildFile.xml @@ -5,3 +5,4 @@ + diff --git a/CondTools/RunInfo/test/test_lhcInfoNewPopCon.sh b/CondTools/RunInfo/test/test_lhcInfoNewPopCon.sh new file mode 100755 index 0000000000000..e503999330647 --- /dev/null +++ b/CondTools/RunInfo/test/test_lhcInfoNewPopCon.sh @@ -0,0 +1,64 @@ +#!/bin/sh + +SCRIPTS_DIR=${CMSSW_BASE}/src/CondTools/RunInfo/python + +function die { echo Failure $1: status $2 ; exit $2 ; } + +assert_equal() { + expected="$1" + actual="$2" + message="$3" + + if [ "$expected" != "$actual" ]; then + die "$message: Expected $expected, but got $actual" 1 + fi +} + +function assert_found_fills { + log_file="$1" + script_name="$2" + shift 2 + for fill_nr in "$@"; do + if ! grep -q "Found fill $fill_nr" "$log_file"; then + die "$script_name didn't find fill $fill_nr" 1 # TODO FIX + fi + done +} + +rm -f lhcinfo_pop_unit_test.db + +echo "testing LHCInfoPerFillPopConAnalyzer in EndFill mode for startTime=\"2022-10-24 01:00:00.000\" endTime=\"2022-10-24 20:00:00.000\"" +cmsRun ${SCRIPTS_DIR}/LHCInfoPerFillPopConAnalyzer.py mode=endFill \ + destinationConnection="sqlite_file:lhcinfo_pop_unit_test.db" \ + startTime="2022-10-24 01:00:00.000" endTime="2022-10-24 20:00:00.000" \ + tag=fill_end_test > fill_end_test.log || die "cmsRun LHCInfoPerFillPopConAnalyzer.py mode=endFill" $? +assert_equal 7 `cat fill_end_test.log | grep -E '^Since ' | \ + wc -l` "LHCInfoPerFillPopConAnalyzer in EndFill mode written wrong number of payloads" +assert_found_fills fill_end_test.log "LHCInfoPerFillPopConAnalyzer in EndFill mode" 8307 8309 + +echo "testing LHCInfoPerLSPopConAnalyzerEndFill in endFill mode for startTime=\"2022-10-24 01:00:00.000\" endTime=\"2022-10-24 20:00:00.000\"" +cmsRun ${SCRIPTS_DIR}/LHCInfoPerLSPopConAnalyzer.py mode=endFill \ + destinationConnection="sqlite_file:lhcinfo_pop_unit_test.db" \ + startTime="2022-10-24 01:00:00.000" endTime="2022-10-24 20:00:00.000" \ + tag=ls_end_test > ls_end_test.log || die "cmsRun LHCInfoPerLSPopConAnalyzer.py mode=endFill" $? +assert_equal 169 `cat ls_end_test.log | grep -E '^Since ' | \ + wc -l` "LHCInfoPerLSPopConAnalyzerEndFill in endFill mode written wrong number of payloads" +assert_found_fills ls_end_test.log "LHCInfoPerLSPopConAnalyzerEndFill in endFill mode" 8307 8309 + +echo "testing LHCInfoPerFillPopConAnalyzer in DuringFill mode for startTime=\"2022-10-24 01:00:00.000\" endTime=\"2022-10-24 20:00:00.000\"" +cmsRun ${SCRIPTS_DIR}/LHCInfoPerFillPopConAnalyzer.py mode=duringFill \ + destinationConnection="sqlite_file:lhcinfo_pop_unit_test.db" \ + startTime="2022-10-24 01:00:00.000" endTime="2022-10-24 20:00:00.000" \ + tag=fill_during_test > fill_during_test.log || die "cmsRun LHCInfoPerFillPopConAnalyzer.py" $? +assert_equal 1 `cat fill_during_test.log | grep -E '^Since ' | \ + wc -l` "LHCInfoPerFillPopConAnalyzer in DuringFill written wrong number of payloads" +assert_found_fills fill_during_test.log "LHCInfoPerFillPopConAnalyzer in DuringFill" 8307 8309 + +echo "testing LHCInfoPerLSPopConAnalyzerEndFill in duringFill mode for startTime=\"2022-10-24 01:00:00.000\" endTime=\"2022-10-24 20:00:00.000\"" +cmsRun ${SCRIPTS_DIR}/LHCInfoPerLSPopConAnalyzer.py mode=duringFill \ + destinationConnection="sqlite_file:lhcinfo_pop_unit_test.db" \ + startTime="2022-10-24 01:00:00.000" endTime="2022-10-24 20:00:00.000" \ + tag=ls_during_test > ls_during_test.log || die "cmsRun LHCInfoPerLSPopConAnalyzer.py mode=duringFill" $? +assert_equal 1 `cat ls_during_test.log | grep -E '^Since ' | \ + wc -l` "LHCInfoPerLSPopConAnalyzerEndFill in duringFill mode written wrong number of payloads" +assert_found_fills ls_during_test.log "LHCInfoPerLSPopConAnalyzerEndFill in duringFill mode" 8307 8309 \ No newline at end of file