Skip to content

Commit

Permalink
fix non-cumulative stats report on config update
Browse files Browse the repository at this point in the history
  • Loading branch information
arriven committed Aug 2, 2022
1 parent 26b6d3e commit e9d24b0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 36 deletions.
28 changes: 15 additions & 13 deletions src/job/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
defer refreshTimer.Stop()
metrics.IncClient()

var cancel context.CancelFunc

var metric *metrics.Metrics
var (
cancel context.CancelFunc
tracker *metrics.StatsTracker
)

for {
rawConfig := config.FetchRawMultiConfig(logger, strings.Split(r.cfgOptions.PathsCSV, ","),
Expand All @@ -111,7 +112,8 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
cancel()
}

metric = &metrics.Metrics{} // clear info about previous targets and avoid old jobs from dumping old info to new metrics
metric := &metrics.Metrics{} // clear info about previous targets and avoid old jobs from dumping old info to new metrics
tracker = metrics.NewStatsTracker(metric)

if rawConfig.Protected {
logger.Info("config is protected, disabling logs")
Expand All @@ -135,7 +137,7 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger) {
return
}

reportMetrics(r.reporter, metric, r.globalJobsCfg.ClientID, logger)
reportMetrics(r.reporter, tracker, r.globalJobsCfg.ClientID, logger)
}
}

Expand Down Expand Up @@ -171,6 +173,9 @@ func computeCount(count int, scaleFactor float64) int {

func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *metrics.Metrics, logger *zap.Logger) (cancel context.CancelFunc) {
ctx, cancel = context.WithCancel(ctx)
ctx = context.WithValue(ctx, templates.ContextKey("goos"), runtime.GOOS)
ctx = context.WithValue(ctx, templates.ContextKey("goarch"), runtime.GOARCH)
ctx = context.WithValue(ctx, templates.ContextKey("version"), ota.Version)

var jobInstancesCount int

Expand Down Expand Up @@ -198,10 +203,6 @@ func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *m
}

ctx := context.WithValue(ctx, templates.ContextKey("config"), cfgMap)
ctx = context.WithValue(ctx, templates.ContextKey("global_config"), r.globalJobsCfg)
ctx = context.WithValue(ctx, templates.ContextKey("goos"), runtime.GOOS)
ctx = context.WithValue(ctx, templates.ContextKey("goarch"), runtime.GOARCH)
ctx = context.WithValue(ctx, templates.ContextKey("version"), ota.Version)
ctx = context.WithValue(ctx, templates.ContextKey("metrics"), metric)

for j := 0; j < cfg.Jobs[i].Count; j++ {
Expand Down Expand Up @@ -229,11 +230,12 @@ func (r *Runner) runJobs(ctx context.Context, cfg *config.MultiConfig, metric *m
return cancel
}

func reportMetrics(reporter metrics.Reporter, metric *metrics.Metrics, clientID string, logger *zap.Logger) {
if reporter != nil && metric != nil {
reporter.WriteSummary(metric)
func reportMetrics(reporter metrics.Reporter, tracker *metrics.StatsTracker, clientID string, logger *zap.Logger) {
if reporter != nil && tracker != nil {
reporter.WriteSummary(tracker)

if err := metrics.ReportStatistics(int64(metric.Sum(metrics.BytesSentStat)), clientID); err != nil {
// TODO: get rid of this
if err := metrics.ReportStatistics(0, clientID); err != nil {
logger.Debug("error reporting statistics", zap.Error(err))
}
}
Expand Down
30 changes: 7 additions & 23 deletions src/utils/metrics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@ import (
// Concurrency-safe.
type Reporter interface {
// WriteSummary dumps Reporter contents into the target.
WriteSummary(*Metrics)
WriteSummary(*StatsTracker)
}

// ZapReporter

type ZapReporter struct {
logger *zap.Logger
groupTargets bool
tracker statsTracker
}

// NewZapReporter creates a new Reporter using a zap logger.
func NewZapReporter(logger *zap.Logger, groupTargets bool) Reporter {
return &ZapReporter{logger: logger, groupTargets: groupTargets}
}

func (r *ZapReporter) WriteSummary(metrics *Metrics) {
stats, totals, statsInterval, totalsInterval := r.tracker.sumStats(metrics, r.groupTargets)
func (r *ZapReporter) WriteSummary(tracker *StatsTracker) {
stats, totals, statsInterval, totalsInterval := tracker.sumStats(r.groupTargets)

r.logger.Info("stats", zap.Object("total", &totals), zap.Object("targets", stats),
zap.Object("total_since_last_report", &totalsInterval), zap.Object("targets_since_last_report", statsInterval))
Expand All @@ -41,25 +40,24 @@ func (r *ZapReporter) WriteSummary(metrics *Metrics) {
type ConsoleReporter struct {
target *bufio.Writer
groupTargets bool
tracker statsTracker
}

// NewConsoleReporter creates a new Reporter which outputs straight to the console
func NewConsoleReporter(target io.Writer, groupTargets bool) Reporter {
return &ConsoleReporter{target: bufio.NewWriter(target), groupTargets: groupTargets}
}

func (r *ConsoleReporter) WriteSummary(metrics *Metrics) {
func (r *ConsoleReporter) WriteSummary(tracker *StatsTracker) {
writer := tabwriter.NewWriter(r.target, 1, 1, 1, ' ', tabwriter.AlignRight)

r.writeSummaryTo(metrics, writer)
r.writeSummaryTo(tracker, writer)

// Important to flush the remains of bufio.Writer
r.target.Flush()
}

func (r *ConsoleReporter) writeSummaryTo(metrics *Metrics, writer *tabwriter.Writer) {
stats, totals, statsInterval, totalsInterval := r.tracker.sumStats(metrics, r.groupTargets)
func (r *ConsoleReporter) writeSummaryTo(tracker *StatsTracker, writer *tabwriter.Writer) {
stats, totals, statsInterval, totalsInterval := tracker.sumStats(r.groupTargets)

defer writer.Flush()

Expand Down Expand Up @@ -89,17 +87,3 @@ func printStatsRow(writer *tabwriter.Writer, rowName string, stats Stats, diff S
float64(diff[BytesReceivedStat])/BytesInMegabyte, float64(stats[BytesReceivedStat])/BytesInMegabyte,
)
}

// statsTracker generalizes tracking stats changes between reports
type statsTracker struct {
lastStats PerTargetStats
lastTotals Stats
}

func (st *statsTracker) sumStats(metrics *Metrics, groupTargets bool) (stats PerTargetStats, totals Stats, statsInterval PerTargetStats, totalsInterval Stats) {
stats, totals = metrics.SumAllStats(groupTargets)
statsInterval, totalsInterval = stats.Diff(st.lastStats), Diff(totals, st.lastTotals)
st.lastStats, st.lastTotals = stats, totals

return
}
20 changes: 20 additions & 0 deletions src/utils/metrics/stats_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package metrics

func NewStatsTracker(metrics *Metrics) *StatsTracker {
return &StatsTracker{metrics: metrics}
}

// StatsTracker generalizes tracking stats changes between reports
type StatsTracker struct {
lastStats PerTargetStats
lastTotals Stats
metrics *Metrics
}

func (st *StatsTracker) sumStats(groupTargets bool) (stats PerTargetStats, totals Stats, statsInterval PerTargetStats, totalsInterval Stats) {
stats, totals = st.metrics.SumAllStats(groupTargets)
statsInterval, totalsInterval = stats.Diff(st.lastStats), Diff(totals, st.lastTotals)
st.lastStats, st.lastTotals = stats, totals

return
}

0 comments on commit e9d24b0

Please sign in to comment.