Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow overriding the collection_jitter and precision per input #7762

Merged
merged 2 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,20 @@ func (a *Agent) startInputs(

for _, input := range inputs {
if si, ok := input.Input.(telegraf.ServiceInput); ok {
// Service input plugins are not subject to timestamp rounding.
// Service input plugins are not normally subject to timestamp
// rounding except for when precision is set on the input plugin.
//
// This only applies to the accumulator passed to Start(), the
// Gather() accumulator does apply rounding according to the
// precision agent setting.
// precision and interval agent/plugin settings.
var interval time.Duration
var precision time.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}

acc := NewAccumulator(input, dst)
acc.SetPrecision(time.Nanosecond)
acc.SetPrecision(getPrecision(precision, interval))

err := si.Start(acc)
if err != nil {
Expand All @@ -276,14 +284,24 @@ func (a *Agent) runInputs(
) error {
var wg sync.WaitGroup
for _, input := range unit.inputs {
interval := a.Config.Agent.Interval.Duration
jitter := a.Config.Agent.CollectionJitter.Duration

// Overwrite agent interval if this plugin has its own.
interval := a.Config.Agent.Interval.Duration
if input.Config.Interval != 0 {
interval = input.Config.Interval
}

// Overwrite agent precision if this plugin has its own.
precision := a.Config.Agent.Precision.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}

// Overwrite agent collection_jitter if this plugin has its own.
jitter := a.Config.Agent.CollectionJitter.Duration
if input.Config.CollectionJitter != 0 {
jitter = input.Config.CollectionJitter
}

var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter)
Expand All @@ -293,7 +311,7 @@ func (a *Agent) runInputs(
defer ticker.Stop()

acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))

wg.Add(1)
go func(input *models.RunningInput) {
Expand Down Expand Up @@ -368,12 +386,24 @@ func (a *Agent) testRunInputs(
go func(input *models.RunningInput) {
defer wg.Done()

// Overwrite agent interval if this plugin has its own.
interval := a.Config.Agent.Interval.Duration
if input.Config.Interval != 0 {
interval = input.Config.Interval
}

// Overwrite agent precision if this plugin has its own.
precision := a.Config.Agent.Precision.Duration
if input.Config.Precision != 0 {
precision = input.Config.Precision
}

// Run plugins that require multiple gathers to calculate rate
// and delta metrics twice.
switch input.Config.Name {
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nul)
nulAcc.SetPrecision(a.Precision())
nulAcc.SetPrecision(getPrecision(precision, interval))
if err := input.Input.Gather(nulAcc); err != nil {
nulAcc.AddError(err)
}
Expand All @@ -382,7 +412,7 @@ func (a *Agent) testRunInputs(
}

acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))

if err := input.Input.Gather(acc); err != nil {
acc.AddError(err)
Expand Down Expand Up @@ -580,8 +610,11 @@ func (a *Agent) runAggregators(
go func(agg *models.RunningAggregator) {
defer wg.Done()

interval := a.Config.Agent.Interval.Duration
precision := a.Config.Agent.Precision.Duration

acc := NewAccumulator(agg, unit.aggC)
acc.SetPrecision(a.Precision())
acc.SetPrecision(getPrecision(precision, interval))
a.push(ctx, agg, acc)
}(agg)
}
Expand Down Expand Up @@ -705,8 +738,8 @@ func (a *Agent) runOutputs(

jitter := jitter
// Overwrite agent flush_jitter if this plugin has its own.
if output.Config.FlushJitter != nil {
jitter = *output.Config.FlushJitter
if output.Config.FlushJitter != 0 {
jitter = output.Config.FlushJitter
}

wg.Add(1)
Expand Down Expand Up @@ -1063,10 +1096,7 @@ func (a *Agent) once(ctx context.Context, wait time.Duration) error {
}

// Returns the rounding precision for metrics.
func (a *Agent) Precision() time.Duration {
precision := a.Config.Agent.Precision.Duration
interval := a.Config.Agent.Interval.Duration

func getPrecision(precision, interval time.Duration) time.Duration {
if precision > 0 {
return precision
}
Expand Down
108 changes: 37 additions & 71 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,44 +1075,18 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
Grace: time.Second * 0,
}

if node, ok := tbl.Fields["period"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Period = dur
}
}
if err := getConfigDuration(tbl, "period", &conf.Period); err != nil {
return nil, err
}

if node, ok := tbl.Fields["delay"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Delay = dur
}
}
if err := getConfigDuration(tbl, "delay", &conf.Delay); err != nil {
return nil, err
}

if node, ok := tbl.Fields["grace"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

conf.Grace = dur
}
}
if err := getConfigDuration(tbl, "grace", &conf.Grace); err != nil {
return nil, err
}

if node, ok := tbl.Fields["drop_original"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
Expand Down Expand Up @@ -1166,9 +1140,6 @@ func buildAggregator(name string, tbl *ast.Table) (*models.AggregatorConfig, err
}
}

delete(tbl.Fields, "period")
delete(tbl.Fields, "delay")
delete(tbl.Fields, "grace")
delete(tbl.Fields, "drop_original")
delete(tbl.Fields, "name_prefix")
delete(tbl.Fields, "name_suffix")
Expand Down Expand Up @@ -1361,17 +1332,17 @@ func buildFilter(tbl *ast.Table) (models.Filter, error) {
// models.InputConfig to be inserted into models.RunningInput
func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
cp := &models.InputConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

cp.Interval = dur
}
}
if err := getConfigDuration(tbl, "interval", &cp.Interval); err != nil {
return nil, err
}

if err := getConfigDuration(tbl, "precision", &cp.Precision); err != nil {
return nil, err
}

if err := getConfigDuration(tbl, "collection_jitter", &cp.CollectionJitter); err != nil {
return nil, err
}

if node, ok := tbl.Fields["name_prefix"]; ok {
Expand Down Expand Up @@ -1419,7 +1390,6 @@ func buildInput(name string, tbl *ast.Table) (*models.InputConfig, error) {
delete(tbl.Fields, "name_suffix")
delete(tbl.Fields, "name_override")
delete(tbl.Fields, "alias")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tags")
var err error
cp.Filter, err = buildFilter(tbl)
Expand Down Expand Up @@ -2141,30 +2111,12 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
oc.Filter.NamePass = oc.Filter.FieldPass
}

if node, ok := tbl.Fields["flush_interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

oc.FlushInterval = dur
}
}
if err := getConfigDuration(tbl, "flush_interval", &oc.FlushInterval); err != nil {
return nil, err
}

if node, ok := tbl.Fields["flush_jitter"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}
oc.FlushJitter = new(time.Duration)
*oc.FlushJitter = dur
}
}
if err := getConfigDuration(tbl, "flush_jitter", &oc.FlushJitter); err != nil {
return nil, err
}

if node, ok := tbl.Fields["metric_buffer_limit"]; ok {
Expand Down Expand Up @@ -2223,8 +2175,6 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
}
}

delete(tbl.Fields, "flush_interval")
delete(tbl.Fields, "flush_jitter")
delete(tbl.Fields, "metric_buffer_limit")
delete(tbl.Fields, "metric_batch_size")
delete(tbl.Fields, "alias")
Expand All @@ -2241,3 +2191,19 @@ func buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, error) {
type unwrappable interface {
Unwrap() telegraf.Processor
}

func getConfigDuration(tbl *ast.Table, key string, target *time.Duration) error {
if node, ok := tbl.Fields[key]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
d, err := time.ParseDuration(str.Value)
if err != nil {
return err
}
delete(tbl.Fields, key)
*target = d
}
}
}
return nil
}
26 changes: 22 additions & 4 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ The agent table configures Telegraf and the defaults used across all plugins.
running a large number of telegraf instances. ie, a jitter of 5s and interval
10s means flushes will happen every 10-15s.


- **precision**:
Collected metrics are rounded to the precision specified as an [interval][].

Expand Down Expand Up @@ -194,13 +193,32 @@ driven operation.
Parameters that can be used with any input plugin:

- **alias**: Name an instance of a plugin.
- **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular input should be run less or more
often, you can configure that here.

- **interval**:
Overrides the `interval` setting of the [agent][Agent] for the plugin. How
often to gather this metric. Normal plugins use a single global interval, but
if one particular input should be run less or more often, you can configure
that here.

- **precision**:
Overrides the `precision` setting of the [agent][Agent] for the plugin.
Collected metrics are rounded to the precision specified as an [interval][].

When this value is set on a service input, multiple events occuring at the
same timestamp may be merged by the output database.

- **collection_jitter**:
Overrides the `collection_jitter` setting of the [agent][Agent] for the
plugin. Collection jitter is used to jitter the collection by a random
[interval][].

- **name_override**: Override the base name of the measurement. (Default is
the name of the input).

- **name_prefix**: Specifies a prefix to attach to the measurement name.

- **name_suffix**: Specifies a suffix to attach to the measurement name.

- **tags**: A map of tags to apply to a specific input's measurements.

The [metric filtering][] parameters can be used to limit what metrics are
Expand Down
8 changes: 5 additions & 3 deletions models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {

// InputConfig is the common config for all inputs.
type InputConfig struct {
Name string
Alias string
Interval time.Duration
Name string
Alias string
Interval time.Duration
CollectionJitter time.Duration
Precision time.Duration

NameOverride string
MeasurementPrefix string
Expand Down
2 changes: 1 addition & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type OutputConfig struct {
Filter Filter

FlushInterval time.Duration
FlushJitter *time.Duration
FlushJitter time.Duration
MetricBufferLimit int
MetricBatchSize int

Expand Down