From a0ae5daf1115b13038dc1b0fa2e57c2a1d260f1c Mon Sep 17 00:00:00 2001 From: Piotr Date: Thu, 18 Jan 2018 11:35:23 +0100 Subject: [PATCH] aggregator: move time.now into the aggregator By the time the aggregator.run() was called about 600ms already passed since setting now which was skewing up the aggregation intervals and skipping metrics. --- agent/agent.go | 4 +--- internal/models/running_aggregator.go | 2 +- internal/models/running_aggregator_test.go | 6 +++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index af96718cd405a..8739f941ddb04 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -364,8 +364,6 @@ func (a *Agent) Run(shutdown chan struct{}) error { metricC := make(chan telegraf.Metric, 100) aggC := make(chan telegraf.Metric, 100) - now := time.Now() - // Start all ServicePlugins for _, input := range a.Config.Inputs { input.SetDefaultTags(a.Config.Tags) @@ -406,7 +404,7 @@ func (a *Agent) Run(shutdown chan struct{}) error { acc := NewAccumulator(agg, aggC) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) - agg.Run(acc, now, shutdown) + agg.Run(acc, shutdown) }(aggregator) } diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 91e5334e8f2b1..8189a6667aa2d 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -114,7 +114,6 @@ func (r *RunningAggregator) reset() { // for period ticks to tell it when to push and reset the aggregator. func (r *RunningAggregator) Run( acc telegraf.Accumulator, - now time.Time, shutdown chan struct{}, ) { // The start of the period is truncated to the nearest second. @@ -133,6 +132,7 @@ func (r *RunningAggregator) Run( // 2nd interval: 00:10 - 00:20.5 // etc. // + now := time.Now() r.periodStart = now.Truncate(time.Second) truncation := now.Sub(r.periodStart) r.periodEnd = r.periodStart.Add(r.Config.Period) diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index cb56dc4ef89bc..30279f0ee1c28 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -24,7 +24,7 @@ func TestAdd(t *testing.T) { }) assert.NoError(t, ra.Config.Filter.Compile()) acc := testutil.Accumulator{} - go ra.Run(&acc, time.Now(), make(chan struct{})) + go ra.Run(&acc, make(chan struct{})) m := ra.MakeMetric( "RITest", @@ -55,7 +55,7 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { }) assert.NoError(t, ra.Config.Filter.Compile()) acc := testutil.Accumulator{} - go ra.Run(&acc, time.Now(), make(chan struct{})) + go ra.Run(&acc, make(chan struct{})) // metric before current period m := ra.MakeMetric( @@ -113,7 +113,7 @@ func TestAddAndPushOnePeriod(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - ra.Run(&acc, time.Now(), shutdown) + ra.Run(&acc, shutdown) }() m := ra.MakeMetric(