From e9d24b0271d4e712dee60279de4453a0107a46a0 Mon Sep 17 00:00:00 2001 From: arriven <20084245+Arriven@users.noreply.github.com> Date: Tue, 2 Aug 2022 19:52:14 +0300 Subject: [PATCH] fix non-cumulative stats report on config update --- src/job/runner.go | 28 +++++++++++++++------------- src/utils/metrics/reporter.go | 30 +++++++----------------------- src/utils/metrics/stats_tracker.go | 20 ++++++++++++++++++++ 3 files changed, 42 insertions(+), 36 deletions(-) create mode 100644 src/utils/metrics/stats_tracker.go diff --git a/src/job/runner.go b/src/job/runner.go index d50d2d38..8fd05eda 100644 --- a/src/job/runner.go +++ b/src/job/runner.go @@ -91,9 +91,10 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) { defer refreshTimer.Stop() metrics.IncClient() - var cancel context.CancelFunc - - var metric *metrics.Metrics + var ( + cancel context.CancelFunc + tracker *metrics.StatsTracker + ) for { rawConfig := config.FetchRawMultiConfig(logger, strings.Split(r.cfgOptions.PathsCSV, ","), @@ -111,7 +112,8 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) { cancel() } - metric = &metrics.Metrics{} // clear info about previous targets and avoid old jobs from dumping old info to new metrics + metric := &metrics.Metrics{} // clear info about previous targets and avoid old jobs from dumping old info to new metrics + tracker = metrics.NewStatsTracker(metric) if rawConfig.Protected { logger.Info("config is protected, disabling logs") @@ -135,7 +137,7 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) { return } - reportMetrics(r.reporter, metric, r.globalJobsCfg.ClientID, logger) + reportMetrics(r.reporter, tracker, r.globalJobsCfg.ClientID, logger) } } @@ -171,6 +173,9 @@ func computeCount(count int, scaleFactor float64) int { func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *metrics.Metrics, logger *zap.Logger) (cancel context.CancelFunc) { ctx, cancel = context.WithCancel(ctx) + ctx = context.WithValue(ctx, templates.ContextKey("goos"), runtime.GOOS) + ctx = context.WithValue(ctx, templates.ContextKey("goarch"), runtime.GOARCH) + ctx = context.WithValue(ctx, templates.ContextKey("version"), ota.Version) var jobInstancesCount int @@ -198,10 +203,6 @@ func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *m } ctx := context.WithValue(ctx, templates.ContextKey("config"), cfgMap) - ctx = context.WithValue(ctx, templates.ContextKey("global_config"), r.globalJobsCfg) - ctx = context.WithValue(ctx, templates.ContextKey("goos"), runtime.GOOS) - ctx = context.WithValue(ctx, templates.ContextKey("goarch"), runtime.GOARCH) - ctx = context.WithValue(ctx, templates.ContextKey("version"), ota.Version) ctx = context.WithValue(ctx, templates.ContextKey("metrics"), metric) for j := 0; j < cfg.Jobs[i].Count; j++ { @@ -229,11 +230,12 @@ func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *m return cancel } -func reportMetrics(reporter metrics.Reporter, metric *metrics.Metrics, clientID string, logger *zap.Logger) { - if reporter != nil && metric != nil { - reporter.WriteSummary(metric) +func reportMetrics(reporter metrics.Reporter, tracker *metrics.StatsTracker, clientID string, logger *zap.Logger) { + if reporter != nil && tracker != nil { + reporter.WriteSummary(tracker) - if err := metrics.ReportStatistics(int64(metric.Sum(metrics.BytesSentStat)), clientID); err != nil { + // TODO: get rid of this + if err := metrics.ReportStatistics(0, clientID); err != nil { logger.Debug("error reporting statistics", zap.Error(err)) } } diff --git a/src/utils/metrics/reporter.go b/src/utils/metrics/reporter.go index f26a684a..623479a4 100644 --- a/src/utils/metrics/reporter.go +++ b/src/utils/metrics/reporter.go @@ -13,7 +13,7 @@ import ( // Concurrency-safe. type Reporter interface { // WriteSummary dumps Reporter contents into the target. - WriteSummary(*Metrics) + WriteSummary(*StatsTracker) } // ZapReporter @@ -21,7 +21,6 @@ type Reporter interface { type ZapReporter struct { logger *zap.Logger groupTargets bool - tracker statsTracker } // NewZapReporter creates a new Reporter using a zap logger. @@ -29,8 +28,8 @@ func NewZapReporter(logger *zap.Logger, groupTargets bool) Reporter { return &ZapReporter{logger: logger, groupTargets: groupTargets} } -func (r *ZapReporter) WriteSummary(metrics *Metrics) { - stats, totals, statsInterval, totalsInterval := r.tracker.sumStats(metrics, r.groupTargets) +func (r *ZapReporter) WriteSummary(tracker *StatsTracker) { + stats, totals, statsInterval, totalsInterval := tracker.sumStats(r.groupTargets) r.logger.Info("stats", zap.Object("total", &totals), zap.Object("targets", stats), zap.Object("total_since_last_report", &totalsInterval), zap.Object("targets_since_last_report", statsInterval)) @@ -41,7 +40,6 @@ func (r *ZapReporter) WriteSummary(metrics *Metrics) { type ConsoleReporter struct { target *bufio.Writer groupTargets bool - tracker statsTracker } // NewConsoleReporter creates a new Reporter which outputs straight to the console @@ -49,17 +47,17 @@ func NewConsoleReporter(target io.Writer, groupTargets bool) Reporter { return &ConsoleReporter{target: bufio.NewWriter(target), groupTargets: groupTargets} } -func (r *ConsoleReporter) WriteSummary(metrics *Metrics) { +func (r *ConsoleReporter) WriteSummary(tracker *StatsTracker) { writer := tabwriter.NewWriter(r.target, 1, 1, 1, ' ', tabwriter.AlignRight) - r.writeSummaryTo(metrics, writer) + r.writeSummaryTo(tracker, writer) // Important to flush the remains of bufio.Writer r.target.Flush() } -func (r *ConsoleReporter) writeSummaryTo(metrics *Metrics, writer *tabwriter.Writer) { - stats, totals, statsInterval, totalsInterval := r.tracker.sumStats(metrics, r.groupTargets) +func (r *ConsoleReporter) writeSummaryTo(tracker *StatsTracker, writer *tabwriter.Writer) { + stats, totals, statsInterval, totalsInterval := tracker.sumStats(r.groupTargets) defer writer.Flush() @@ -89,17 +87,3 @@ func printStatsRow(writer *tabwriter.Writer, rowName string, stats Stats, diff S float64(diff[BytesReceivedStat])/BytesInMegabyte, float64(stats[BytesReceivedStat])/BytesInMegabyte, ) } - -// statsTracker generalizes tracking stats changes between reports -type statsTracker struct { - lastStats PerTargetStats - lastTotals Stats -} - -func (st *statsTracker) sumStats(metrics *Metrics, groupTargets bool) (stats PerTargetStats, totals Stats, statsInterval PerTargetStats, totalsInterval Stats) { - stats, totals = metrics.SumAllStats(groupTargets) - statsInterval, totalsInterval = stats.Diff(st.lastStats), Diff(totals, st.lastTotals) - st.lastStats, st.lastTotals = stats, totals - - return -} diff --git a/src/utils/metrics/stats_tracker.go b/src/utils/metrics/stats_tracker.go new file mode 100644 index 00000000..4144f7f2 --- /dev/null +++ b/src/utils/metrics/stats_tracker.go @@ -0,0 +1,20 @@ +package metrics + +func NewStatsTracker(metrics *Metrics) *StatsTracker { + return &StatsTracker{metrics: metrics} +} + +// StatsTracker generalizes tracking stats changes between reports +type StatsTracker struct { + lastStats PerTargetStats + lastTotals Stats + metrics *Metrics +} + +func (st *StatsTracker) sumStats(groupTargets bool) (stats PerTargetStats, totals Stats, statsInterval PerTargetStats, totalsInterval Stats) { + stats, totals = st.metrics.SumAllStats(groupTargets) + statsInterval, totalsInterval = stats.Diff(st.lastStats), Diff(totals, st.lastTotals) + st.lastStats, st.lastTotals = stats, totals + + return +}