Skip to content

Commit

Permalink
DPL: allow customising DataProcessingStats intervals
Browse files Browse the repository at this point in the history
In particular, this allows setting a minimum value for online metrics.
  • Loading branch information
ktf committed Oct 10, 2024
1 parent 2701499 commit 2a47fa0
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 19 deletions.
11 changes: 10 additions & 1 deletion Framework/Core/include/Framework/DataProcessingStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <atomic>
#include <cstdint>
#include <array>
#include <memory>
#include <numeric>
#include <mutex>
#include <utility>
Expand Down Expand Up @@ -69,8 +70,16 @@ enum struct ProcessingStatsId : short {

/// Helper struct to hold statistics about the data processing happening.
struct DataProcessingStats {
// Parameters for the default behaviour
struct DefaultConfig {
int64_t minOnlinePublishInterval = 0;
};

DefaultConfig config = {};

DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp);
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp,
DefaultConfig config);

constexpr static ServiceKind service_kind = ServiceKind::Global;
constexpr static unsigned short MAX_METRICS = 1 << 15;
Expand Down
11 changes: 7 additions & 4 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -715,14 +715,14 @@ o2::framework::ServiceSpec
O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Queueing oldest possible timeslice %" PRIu64 " propagation for execution.",
(uint64_t)oldestPossibleOutput.timeslice.value);
AsyncQueueHelpers::post(
queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
.id = decongestion.oldestPossibleTimesliceTask,
queue, AsyncTask{ .timeslice = TimesliceId{oldestPossibleTimeslice},
.id = decongestion.oldestPossibleTimesliceTask,
.debounce = -1, .callback = decongestionCallback}
.user<DecongestionContext>(DecongestionContext{.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));

if (decongestion.orderedCompletionPolicyActive) {
AsyncQueueHelpers::post(
queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleOutput.timeslice.value},.id = decongestion.oldestPossibleTimesliceTask, .debounce = -1,
.callback = decongestionCallbackOrdered}
.user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
} },
Expand Down Expand Up @@ -867,8 +867,11 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
clock_gettime(CLOCK_REALTIME, &now);
uv_update_time(state.loop);
uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
DataProcessingStats::DefaultConfig config = {
.minOnlinePublishInterval = std::stoi(options.GetProperty<std::string>("dpl-stats-min-online-publishing-interval").c_str()) * 1000};
auto* stats = new DataProcessingStats(TimingHelpers::defaultRealtimeBaseConfigurator(offset, state.loop),
TimingHelpers::defaultCPUTimeConfigurator(state.loop));
TimingHelpers::defaultCPUTimeConfigurator(state.loop),
config);
auto& runningWorkflow = services.get<RunningWorkflowInfo const>();

// It makes no sense to update the stats more often than every 5s
Expand Down
14 changes: 8 additions & 6 deletions Framework/Core/src/DataProcessingStats.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,20 @@

#include "Framework/DataProcessingStats.h"
#include "Framework/RuntimeError.h"
#include "Framework/ServiceRegistryRef.h"
#include "Framework/DeviceState.h"
#include "Framework/Logger.h"
#include <uv.h>
#include <iostream>
#include <atomic>
#include <utility>
#include <thread>

namespace o2::framework
{

DataProcessingStats::DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase_,
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_)
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_,
DefaultConfig config_)
: getTimestamp(getTimestamp_),
getRealtimeBase(getRealtimeBase_)
getRealtimeBase(getRealtimeBase_),
config(config_)
{
getRealtimeBase(realTimeBase, initialTimeOffset);
}
Expand Down Expand Up @@ -269,6 +268,9 @@ void DataProcessingStats::registerMetric(MetricSpec const& spec)
metricSpecs[spec.metricId] = spec;
metricsNames[spec.metricId] = spec.name;
metrics[spec.metricId] = spec.defaultValue;
if (metricSpecs[spec.metricId].scope == Scope::Online) {
metricSpecs[spec.metricId].minPublishInterval = std::max(metricSpecs[spec.metricId].minPublishInterval, config.minOnlinePublishInterval);
}
int64_t currentTime = getTimestamp(realTimeBase, initialTimeOffset);
updateInfos[spec.metricId] = UpdateInfo{currentTime, currentTime};
updated[spec.metricId] = spec.sendInitialValue;
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("configuration,cfg", bpo::value<std::string>(), "configuration connection string") //
("driver-client-backend", bpo::value<std::string>(), "driver connection string") //
("monitoring-backend", bpo::value<std::string>(), "monitoring connection string") //
("dpl-stats-min-online-publishing-interval", bpo::value<std::string>(), "minimum flushing interval for online metrics (in s)") //
("infologger-mode", bpo::value<std::string>(), "O2_INFOLOGGER_MODE override") //
("infologger-severity", bpo::value<std::string>(), "minimun FairLogger severity which goes to info logger") //
("dpl-tracing-flags", bpo::value<std::string>(), "pipe separated list of events to trace") //
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
ConfigParamsHelper::populateBoostProgramOptions(optsDesc, spec.options, gHiddenDeviceOptions);
char const* defaultSignposts = getenv("DPL_SIGNPOSTS");
optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
("dpl-stats-min-online-publishing-interval", bpo::value<std::string>()->default_value("0"), "minimum flushing interval for online metrics (in s)") //
("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/test/test_ComputingQuotaEvaluator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ TEST_CASE("TestComputingQuotaEvaluator")
};

DataProcessingStats stats(TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});

ServiceRegistry registry;
ServiceRegistryRef ref(registry);
Expand Down
12 changes: 6 additions & 6 deletions Framework/Core/test/test_DataProcessingStats.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using namespace o2::framework;
TEST_CASE("DataProcessingStats")
{
DataProcessingStats stats(TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});

o2::framework::clean_all_runtime_errors();
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric});
Expand Down Expand Up @@ -190,7 +190,7 @@ TEST_CASE("DataProcessingStatsOutOfOrder")
int64_t value[] = {0, 1000, 999, 998};
return base + value[count++] - offset;
};
DataProcessingStats stats(realtimeTime, cpuTime);
DataProcessingStats stats(realtimeTime, cpuTime, {});
// Notice this will consume one value in the cpuTime.
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric});
stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 2});
Expand Down Expand Up @@ -222,7 +222,7 @@ TEST_CASE("DataProcessingStatsInstantaneousRate")

// I want to push deltas since the last update and have the immediate time
// averaged being stored.
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator);
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {});
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate});
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 0);
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0);
Expand Down Expand Up @@ -265,7 +265,7 @@ TEST_CASE("DataProcessingStatsCumulativeRate")

// I want to push deltas since the last update and have the immediate time
// averaged being stored.
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator);
DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {});
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate});
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
Expand Down Expand Up @@ -310,7 +310,7 @@ TEST_CASE("DataProcessingStatsPublishing")

// I want to push deltas since the last update and have the immediate time
// averaged being stored.
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp);
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {});
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 5000});
stats.registerMetric({.name = "dummy_metric2", .metricId = DummyMetric2, .minPublishInterval = 2000});
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
Expand Down Expand Up @@ -355,7 +355,7 @@ TEST_CASE("DataProcessingStatsPublishingRepeated")

// I want to push deltas since the last update and have the immediate time
// averaged being stored.
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp);
DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {});
stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 3000, .maxRefreshLatency = 9000});
REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/test/test_DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TEST_CASE("DataRelayer")
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
DataProcessingStats stats(
TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()));
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
int quickUpdateInterval = 1;
using MetricSpec = DataProcessingStats::MetricSpec;
std::vector<MetricSpec> specs{
Expand Down

0 comments on commit 2a47fa0

Please sign in to comment.