From b5e57e61f3cfdd8a5d3e6bca14176cf7a1096a8a Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 30 Jun 2020 23:15:11 -0700 Subject: [PATCH] Allow overriding the collection_jitter and precision per input (#7762) --- agent/agent.go | 62 ++++++++++++++++------ config/config.go | 108 ++++++++++++++------------------------- docs/CONFIGURATION.md | 26 ++++++++-- models/running_input.go | 8 +-- models/running_output.go | 2 +- 5 files changed, 111 insertions(+), 95 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 0ff5893785293..4a9b43b22e386 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -246,12 +246,20 @@ func (a *Agent) startInputs( for _, input := range inputs { if si, ok := input.Input.(telegraf.ServiceInput); ok { - // Service input plugins are not subject to timestamp rounding. + // Service input plugins are not normally subject to timestamp + // rounding except for when precision is set on the input plugin. + // // This only applies to the accumulator passed to Start(), the // Gather() accumulator does apply rounding according to the - // precision agent setting. + // precision and interval agent/plugin settings. + var interval time.Duration + var precision time.Duration + if input.Config.Precision != 0 { + precision = input.Config.Precision + } + acc := NewAccumulator(input, dst) - acc.SetPrecision(time.Nanosecond) + acc.SetPrecision(getPrecision(precision, interval)) err := si.Start(acc) if err != nil { @@ -276,14 +284,24 @@ func (a *Agent) runInputs( ) error { var wg sync.WaitGroup for _, input := range unit.inputs { - interval := a.Config.Agent.Interval.Duration - jitter := a.Config.Agent.CollectionJitter.Duration - // Overwrite agent interval if this plugin has its own. + interval := a.Config.Agent.Interval.Duration if input.Config.Interval != 0 { interval = input.Config.Interval } + // Overwrite agent precision if this plugin has its own. + precision := a.Config.Agent.Precision.Duration + if input.Config.Precision != 0 { + precision = input.Config.Precision + } + + // Overwrite agent collection_jitter if this plugin has its own. + jitter := a.Config.Agent.CollectionJitter.Duration + if input.Config.CollectionJitter != 0 { + jitter = input.Config.CollectionJitter + } + var ticker Ticker if a.Config.Agent.RoundInterval { ticker = NewAlignedTicker(startTime, interval, jitter) @@ -293,7 +311,7 @@ func (a *Agent) runInputs( defer ticker.Stop() acc := NewAccumulator(input, unit.dst) - acc.SetPrecision(a.Precision()) + acc.SetPrecision(getPrecision(precision, interval)) wg.Add(1) go func(input *models.RunningInput) { @@ -368,12 +386,24 @@ func (a *Agent) testRunInputs( go func(input *models.RunningInput) { defer wg.Done() + // Overwrite agent interval if this plugin has its own. + interval := a.Config.Agent.Interval.Duration + if input.Config.Interval != 0 { + interval = input.Config.Interval + } + + // Overwrite agent precision if this plugin has its own. + precision := a.Config.Agent.Precision.Duration + if input.Config.Precision != 0 { + precision = input.Config.Precision + } + // Run plugins that require multiple gathers to calculate rate // and delta metrics twice. switch input.Config.Name { case "cpu", "mongodb", "procstat": nulAcc := NewAccumulator(input, nul) - nulAcc.SetPrecision(a.Precision()) + nulAcc.SetPrecision(getPrecision(precision, interval)) if err := input.Input.Gather(nulAcc); err != nil { nulAcc.AddError(err) } @@ -382,7 +412,7 @@ func (a *Agent) testRunInputs( } acc := NewAccumulator(input, unit.dst) - acc.SetPrecision(a.Precision()) + acc.SetPrecision(getPrecision(precision, interval)) if err := input.Input.Gather(acc); err != nil { acc.AddError(err) @@ -580,8 +610,11 @@ func (a *Agent) runAggregators( go func(agg *models.RunningAggregator) { defer wg.Done() + interval := a.Config.Agent.Interval.Duration + precision := a.Config.Agent.Precision.Duration + acc := NewAccumulator(agg, unit.aggC) - acc.SetPrecision(a.Precision()) + acc.SetPrecision(getPrecision(precision, interval)) a.push(ctx, agg, acc) }(agg) } @@ -705,8 +738,8 @@ func (a *Agent) runOutputs( jitter := jitter // Overwrite agent flush_jitter if this plugin has its own. - if output.Config.FlushJitter != nil { - jitter = *output.Config.FlushJitter + if output.Config.FlushJitter != 0 { + jitter = output.Config.FlushJitter } wg.Add(1) @@ -1063,10 +1096,7 @@ func (a *Agent) once(ctx context.Context, wait time.Duration) error { } // Returns the rounding precision for metrics. -func (a *Agent) Precision() time.Duration { - precision := a.Config.Agent.Precision.Duration - interval := a.Config.Agent.Interval.Duration - +func getPrecision(precision, interval time.Duration) time.Duration { if precision > 0 { return precision } diff --git a/config/config.go b/config/config.go index 18fe7066ac71e..b70931e76921d 100644 --- a/config/config.go +++ b/config/config.go @@ -1075,44 +1075,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err Grace: time.Second * 0, } - if node, ok := tbl.Fields["period"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - - conf.Period = dur - } - } + if err := getConfigDuration(tbl, "period", &conf.Period); err != nil { + return nil, err } - if node, ok := tbl.Fields["delay"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - - conf.Delay = dur - } - } + if err := getConfigDuration(tbl, "delay", &conf.Delay); err != nil { + return nil, err } - if node, ok := tbl.Fields["grace"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - - conf.Grace = dur - } - } + if err := getConfigDuration(tbl, "grace", &conf.Grace); err != nil { + return nil, err } + if node, ok := tbl.Fields["drop_original"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if b, ok := kv.Value.(*ast.Boolean); ok { @@ -1166,9 +1140,6 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err } } - delete(tbl.Fields, "period") - delete(tbl.Fields, "delay") - delete(tbl.Fields, "grace") delete(tbl.Fields, "drop_original") delete(tbl.Fields, "name_prefix") delete(tbl.Fields, "name_suffix") @@ -1361,17 +1332,17 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) { // models.InputConfig to be inserted into models.RunningInput func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { cp := &models.InputConfig{Name: name} - if node, ok := tbl.Fields["interval"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - cp.Interval = dur - } - } + if err := getConfigDuration(tbl, "interval", &cp.Interval); err != nil { + return nil, err + } + + if err := getConfigDuration(tbl, "precision", &cp.Precision); err != nil { + return nil, err + } + + if err := getConfigDuration(tbl, "collection_jitter", &cp.CollectionJitter); err != nil { + return nil, err } if node, ok := tbl.Fields["name_prefix"]; ok { @@ -1419,7 +1390,6 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) { delete(tbl.Fields, "name_suffix") delete(tbl.Fields, "name_override") delete(tbl.Fields, "alias") - delete(tbl.Fields, "interval") delete(tbl.Fields, "tags") var err error cp.Filter, err = buildFilter(tbl) @@ -2141,30 +2111,12 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { oc.Filter.NamePass = oc.Filter.FieldPass } - if node, ok := tbl.Fields["flush_interval"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - - oc.FlushInterval = dur - } - } + if err := getConfigDuration(tbl, "flush_interval", &oc.FlushInterval); err != nil { + return nil, err } - if node, ok := tbl.Fields["flush_jitter"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - dur, err := time.ParseDuration(str.Value) - if err != nil { - return nil, err - } - oc.FlushJitter = new(time.Duration) - *oc.FlushJitter = dur - } - } + if err := getConfigDuration(tbl, "flush_jitter", &oc.FlushJitter); err != nil { + return nil, err } if node, ok := tbl.Fields["metric_buffer_limit"]; ok { @@ -2223,8 +2175,6 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { } } - delete(tbl.Fields, "flush_interval") - delete(tbl.Fields, "flush_jitter") delete(tbl.Fields, "metric_buffer_limit") delete(tbl.Fields, "metric_batch_size") delete(tbl.Fields, "alias") @@ -2241,3 +2191,19 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) { type unwrappable interface { Unwrap() telegraf.Processor } + +func getConfigDuration(tbl *ast.Table, key string, target *time.Duration) error { + if node, ok := tbl.Fields[key]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + d, err := time.ParseDuration(str.Value) + if err != nil { + return err + } + delete(tbl.Fields, key) + *target = d + } + } + } + return nil +} diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index b0433a859130e..53233c598c3ed 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -132,7 +132,6 @@ The agent table configures Telegraf and the defaults used across all plugins. running a large number of telegraf instances. ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s. - - **precision**: Collected metrics are rounded to the precision specified as an [interval][]. @@ -194,13 +193,32 @@ driven operation. Parameters that can be used with any input plugin: - **alias**: Name an instance of a plugin. -- **interval**: How often to gather this metric. Normal plugins use a single - global interval, but if one particular input should be run less or more - often, you can configure that here. + +- **interval**: + Overrides the `interval` setting of the [agent][Agent] for the plugin. How + often to gather this metric. Normal plugins use a single global interval, but + if one particular input should be run less or more often, you can configure + that here. + +- **precision**: + Overrides the `precision` setting of the [agent][Agent] for the plugin. + Collected metrics are rounded to the precision specified as an [interval][]. + + When this value is set on a service input, multiple events occuring at the + same timestamp may be merged by the output database. + +- **collection_jitter**: + Overrides the `collection_jitter` setting of the [agent][Agent] for the + plugin. Collection jitter is used to jitter the collection by a random + [interval][]. + - **name_override**: Override the base name of the measurement. (Default is the name of the input). + - **name_prefix**: Specifies a prefix to attach to the measurement name. + - **name_suffix**: Specifies a suffix to attach to the measurement name. + - **tags**: A map of tags to apply to a specific input's measurements. The [metric filtering][] parameters can be used to limit what metrics are diff --git a/models/running_input.go b/models/running_input.go index bb1033fdd7800..34eba098699f3 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -56,9 +56,11 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { // InputConfig is the common config for all inputs. type InputConfig struct { - Name string - Alias string - Interval time.Duration + Name string + Alias string + Interval time.Duration + CollectionJitter time.Duration + Precision time.Duration NameOverride string MeasurementPrefix string diff --git a/models/running_output.go b/models/running_output.go index 452ab796bc12f..dd79625036750 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -24,7 +24,7 @@ type OutputConfig struct { Filter Filter FlushInterval time.Duration - FlushJitter *time.Duration + FlushJitter time.Duration MetricBufferLimit int MetricBatchSize int