-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathDQMFileIterator.cc
414 lines (318 loc) · 13 KB
/
DQMFileIterator.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
#include "DQMFileIterator.h"
#include "DQMMonitoringService.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/TimeOfDay.h"
#include <filesystem>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/regex.hpp>
#include <fmt/printf.h>
namespace dqmservices {
DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(const std::string& run_path,
const std::string& filename,
int lumiNumber,
int datafn_position) {
boost::property_tree::ptree pt;
read_json(filename, pt);
LumiEntry lumi;
lumi.filename = filename;
lumi.run_path = run_path;
lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
lumi.file_ls = lumiNumber;
if (datafn_position >= 0) {
lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
}
return lumi;
}
std::string DQMFileIterator::LumiEntry::get_data_path() const {
if (boost::starts_with(datafn, "/"))
return datafn;
std::filesystem::path p(run_path);
p /= datafn;
return p.string();
}
std::string DQMFileIterator::LumiEntry::get_json_path() const { return filename; }
// Contents of Eor json file are ignored for the moment.
// This function will not be called.
DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(const std::string& run_path,
const std::string& filename) {
boost::property_tree::ptree pt;
read_json(filename, pt);
EorEntry eor;
eor.filename = filename;
eor.run_path = run_path;
// We rely on n_events to be the first item on the array...
eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
eor.loaded = true;
return eor;
}
DQMFileIterator::DQMFileIterator(edm::ParameterSet const& pset) : state_(EOR) {
runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
// scan one mode
flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
forceFileCheckTimeoutMillis_ = 5015;
reset();
}
void DQMFileIterator::reset() {
runPath_.clear();
std::vector<std::string> tokens;
boost::split(tokens, runInputDir_, boost::is_any_of(":"));
for (const auto& token : tokens) {
runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
}
eor_.loaded = false;
state_ = State::OPEN;
nextLumiNumber_ = 1;
lumiSeen_.clear();
filesSeen_.clear();
lastLumiLoad_ = std::chrono::high_resolution_clock::now();
collect(true);
update_state();
if (mon_.isAvailable()) {
boost::property_tree::ptree doc;
doc.put("run", runNumber_);
doc.put("next_lumi", nextLumiNumber_);
doc.put("fi_state", std::to_string(state_));
mon_->outputUpdate(doc);
}
}
DQMFileIterator::LumiEntry DQMFileIterator::open() {
LumiEntry& lumi = lumiSeen_[nextLumiNumber_];
advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
return lumi;
}
bool DQMFileIterator::lumiReady() {
if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
return true;
}
return false;
}
unsigned int DQMFileIterator::lastLumiFound() {
if (!lumiSeen_.empty()) {
return lumiSeen_.rbegin()->first;
}
return 1;
}
void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
unsigned int currentLumi = nextLumiNumber_;
nextLumiNumber_ = lumi;
lastLumiLoad_ = std::chrono::high_resolution_clock::now();
auto iter = lumiSeen_.lower_bound(currentLumi);
while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
iter->second.state = reason;
monUpdateLumi(iter->second);
++iter;
}
if (mon_.isAvailable()) {
// report the successful lumi file open
boost::property_tree::ptree doc;
doc.put("next_lumi", nextLumiNumber_);
mon_->outputUpdate(doc);
}
}
void DQMFileIterator::monUpdateLumi(const LumiEntry& lumi) {
if (!mon_.isAvailable())
return;
boost::property_tree::ptree doc;
doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
mon_->outputUpdate(doc);
}
unsigned DQMFileIterator::mtimeHash() const {
unsigned mtime_now = 0;
for (const auto& path : runPath_) {
if (!std::filesystem::exists(path))
continue;
auto write_time = std::filesystem::last_write_time(path);
mtime_now =
mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
}
return mtime_now;
}
void DQMFileIterator::collect(bool ignoreTimers) {
// search filesystem to find available lumi section files
// or the end of run files
auto now = std::chrono::high_resolution_clock::now();
auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
// don't refresh if it's too soon
if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
return;
}
// check if directory changed
auto mtime_now = mtimeHash();
if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
// logFileAction("Directory hasn't changed.");
return;
} else {
// logFileAction("Directory changed, updating.");
}
runPathMTime_ = mtime_now;
runPathLastCollect_ = now;
using std::filesystem::directory_entry;
using std::filesystem::directory_iterator;
std::string fn_eor;
for (const auto& runPath : runPath_) {
if (!std::filesystem::exists(runPath)) {
logFileAction("Directory does not exist: ", runPath);
continue;
}
directory_iterator dend;
for (directory_iterator di(runPath); di != dend; ++di) {
const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
const std::string filename = di->path().filename().string();
const std::string fn = di->path().string();
if (filesSeen_.find(filename) != filesSeen_.end()) {
continue;
}
boost::smatch result;
if (boost::regex_match(filename, result, fn_re)) {
unsigned int run = std::stoi(result[1]);
unsigned int lumi = std::stoi(result[2]);
std::string label = result[3];
filesSeen_.insert(filename);
if (run != runNumber_)
continue;
// check if this is EoR
// for various reasons we have to load it after all other files
if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
fn_eor = fn;
continue;
}
// check if lumi is loaded
if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
continue; // already loaded
}
// check if this belongs to us
if (label != streamLabel_) {
std::string msg("Found and skipped json file (stream label mismatch, ");
msg += label + " [files] != " + streamLabel_ + " [config]";
msg += "): ";
logFileAction(msg, fn);
continue;
}
try {
LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
lumiSeen_.emplace(lumi, lumi_jsn);
logFileAction("Found and loaded json file: ", fn);
monUpdateLumi(lumi_jsn);
} catch (const std::exception& e) {
// don't reset the mtime, keep it waiting
filesSeen_.erase(filename);
std::string msg("Found, tried to load the json, but failed (");
msg += e.what();
msg += "): ";
logFileAction(msg, fn);
}
}
}
}
if ((!fn_eor.empty()) or flagScanOnce_) {
if (!fn_eor.empty()) {
logFileAction("EoR file found: ", fn_eor);
}
// @TODO load EoR files correctly
// eor_ = EorEntry::load_json(fn_eor);
// logFileAction("Loaded eor file: ", fn_eor);
// for now , set n_lumi to the highest _found_ lumi
eor_.loaded = true;
if (lumiSeen_.empty()) {
eor_.n_lumi = 0;
} else {
eor_.n_lumi = lumiSeen_.rbegin()->first;
}
}
}
void DQMFileIterator::update_state() {
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::milliseconds;
State old_state = state_;
// in scanOnce mode we don't do repeated scans
// whatever found at reset() is be used
if (!flagScanOnce_) {
collect(false);
}
if ((state_ == State::OPEN) && (eor_.loaded)) {
state_ = State::EOR_CLOSING;
}
// special case for missing lumi files
// skip to the next available, but after the timeout
if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
if (elapsed_ms >= nextLumiTimeoutMillis_) {
std::string msg("Timeout reached, skipping lumisection(s) ");
msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
logFileAction(msg);
advanceToLumi(iter->first, "skipped: timeout");
}
}
}
if (state_ == State::EOR_CLOSING) {
// check if we parsed all lumis
// n_lumi is both last lumi and the number of lumi
// since lumis are indexed from 1
// after all lumi have been pop()'ed
// current lumi will become larger than the last lumi
if (nextLumiNumber_ > eor_.n_lumi) {
state_ = State::EOR;
}
}
if (state_ != old_state) {
logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
if (mon_) {
boost::property_tree::ptree doc;
doc.put("fi_state", std::to_string(state_));
mon_->outputUpdate(doc);
}
}
}
void DQMFileIterator::logFileAction(const std::string& msg, const std::string& fileName) const {
edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
edm::FlushMessageLog();
}
void DQMFileIterator::logLumiState(const LumiEntry& lumi, const std::string& msg) {
if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
lumiSeen_[lumi.file_ls].state = msg;
monUpdateLumi(lumiSeen_[lumi.file_ls]);
} else {
logFileAction("Internal error: referenced lumi is not the map.");
}
}
void DQMFileIterator::delay() {
if (mon_.isAvailable())
mon_->keepAlive();
usleep(delayMillis_ * 1000);
}
void DQMFileIterator::fillDescription(edm::ParameterSetDescription& desc) {
desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
desc.addUntracked<unsigned int>("datafnPosition", 3)
->setComment(
"Data filename position in the positional arguments array 'data' in "
"json file.");
desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
->setComment(
"Number of milliseconds to wait before switching to the next lumi "
"section if the current is missing, -1 to disable.");
desc.addUntracked<bool>("scanOnce", false)
->setComment(
"Don't repeat file scans: use what was found during the initial scan. "
"EOR file is ignored and the state is set to 'past end of run'.");
desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
}
} // namespace dqmservices