diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index dd2512ef305b8..1b101b02de604 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -325,6 +325,10 @@ Parameters that can be used with any aggregator plugin: how long for aggregators to wait before receiving metrics from input plugins, in the case that aggregators are flushing and inputs are gathering on the same interval. +- **grace**: The duration when the metrics will still be aggregated + by the plugin, even though they're outside of the aggregation period. This + is needed in a situation when the agent is expected to receive late metrics + and it's acceptable to roll them up into next aggregation period. - **drop_original**: If true, the original metric will be dropped by the aggregator and will not get sent to the output plugins. - **name_override**: Override the base name of the measurement. (Default is diff --git a/internal/config/config.go b/internal/config/config.go index a5315b9b6b5bc..d8d545734dc6a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1025,6 +1025,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err Name: name, Delay: time.Millisecond * 100, Period: time.Second * 30, + Grace: time.Second * 0, } if node, ok := tbl.Fields["period"]; ok { @@ -1053,6 +1054,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err } } + if node, ok := tbl.Fields["grace"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + dur, err := time.ParseDuration(str.Value) + if err != nil { + return nil, err + } + + conf.Grace = dur + } + } + } if node, ok := tbl.Fields["drop_original"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if b, ok := kv.Value.(*ast.Boolean); ok { @@ -1100,6 +1113,7 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err delete(tbl.Fields, "period") delete(tbl.Fields, "delay") + delete(tbl.Fields, "grace") delete(tbl.Fields, "drop_original") delete(tbl.Fields, "name_prefix") delete(tbl.Fields, "name_suffix") diff --git a/internal/models/running_aggregator.go b/internal/models/running_aggregator.go index 8bd983eefe2dc..e029aad569597 100644 --- a/internal/models/running_aggregator.go +++ b/internal/models/running_aggregator.go @@ -59,6 +59,7 @@ type AggregatorConfig struct { DropOriginal bool Period time.Duration Delay time.Duration + Grace time.Duration NameOverride string MeasurementPrefix string @@ -135,9 +136,9 @@ func (r *RunningAggregator) Add(m telegraf.Metric) bool { r.Lock() defer r.Unlock() - if m.Time().Before(r.periodStart) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) { - log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s", - r.Name(), m.Time(), r.periodStart, r.periodEnd) + if m.Time().Before(r.periodStart.Add(-r.Config.Grace)) || m.Time().After(r.periodEnd.Add(r.Config.Delay)) { + log.Printf("D! [%s] metric is outside aggregation window; discarding. %s: m: %s e: %s g: %s", + r.Name(), m.Time(), r.periodStart, r.periodEnd, r.Config.Grace) r.MetricsDropped.Incr(1) return r.Config.DropOriginal } diff --git a/internal/models/running_aggregator_test.go b/internal/models/running_aggregator_test.go index 19476eecfbc5a..83b9dea0a8124 100644 --- a/internal/models/running_aggregator_test.go +++ b/internal/models/running_aggregator_test.go @@ -89,6 +89,68 @@ func TestAddMetricsOutsideCurrentPeriod(t *testing.T) { require.Equal(t, int64(101), acc.Metrics[0].Fields["sum"]) } +func TestAddMetricsOutsideCurrentPeriodWithGrace(t *testing.T) { + a := &TestAggregator{} + ra := NewRunningAggregator(a, &AggregatorConfig{ + Name: "TestRunningAggregator", + Filter: Filter{ + NamePass: []string{"*"}, + }, + Period: time.Millisecond * 1500, + Grace: time.Millisecond * 500, + }) + require.NoError(t, ra.Config.Filter.Compile()) + acc := testutil.Accumulator{} + now := time.Now() + ra.UpdateWindow(now, now.Add(ra.Config.Period)) + + m := testutil.MustMetric("RITest", + map[string]string{}, + map[string]interface{}{ + "value": int64(101), + }, + now.Add(-time.Hour), + telegraf.Untyped, + ) + require.False(t, ra.Add(m)) + + // metric before current period (late) + m = testutil.MustMetric("RITest", + map[string]string{}, + map[string]interface{}{ + "value": int64(100), + }, + now.Add(-time.Millisecond*1000), + telegraf.Untyped, + ) + require.False(t, ra.Add(m)) + + // metric before current period, but within grace period (late) + m = testutil.MustMetric("RITest", + map[string]string{}, + map[string]interface{}{ + "value": int64(102), + }, + now.Add(-time.Millisecond*200), + telegraf.Untyped, + ) + require.False(t, ra.Add(m)) + + // "now" metric + m = testutil.MustMetric("RITest", + map[string]string{}, + map[string]interface{}{ + "value": int64(101), + }, + time.Now().Add(time.Millisecond*50), + telegraf.Untyped) + require.False(t, ra.Add(m)) + + ra.Push(&acc) + require.Equal(t, 1, len(acc.Metrics)) + require.Equal(t, int64(203), acc.Metrics[0].Fields["sum"]) +} + func TestAddAndPushOnePeriod(t *testing.T) { a := &TestAggregator{} ra := NewRunningAggregator(a, &AggregatorConfig{