Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[output.wavefront] Introduced "immediate_flush" flag #8165

Merged
merged 5 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/outputs/wavefront/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro
## Truncate metric tags to a total of 254 characters for the tag name value. Wavefront will reject any
## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility.
#truncate_tags = false

## Flush the internal buffers after each batch. This effectively bypasses the background sending of metrics
## normally done by the Wavefront SDK. This can be used if you are experiencing buffer overruns. The sending
## of metrics will block for a longer time, but this will be handled gracefully by the internal buffering in
## Telegraf.
#immediate_flush = true
```


Expand Down
20 changes: 18 additions & 2 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Wavefront struct {
UseRegex bool
UseStrict bool
TruncateTags bool
ImmediateFlush bool
SourceOverride []string
StringToNumber map[string][]map[string]float64

Expand Down Expand Up @@ -101,6 +102,12 @@ var sampleConfig = `
## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility.
#truncate_tags = false

## Flush the internal buffers after each batch. This effectively bypasses the background sending of metrics
## normally done by the Wavefront SDK. This can be used if you are experiencing buffer overruns. The sending
## of metrics will block for a longer time, but this will be handled gracefully by the internal buffering in
## Telegraf.
#immediate_flush = true

## Define a mapping, namespaced by metric prefix, from string values to numeric values
## deprecated in 1.9; use the enum processor plugin
#[[outputs.wavefront.string_to_number.elasticsearch]]
Expand All @@ -123,12 +130,16 @@ func (w *Wavefront) Connect() error {
w.Log.Warn("The string_to_number option is deprecated; please use the enum processor instead")
}

flushSeconds := 5
if w.ImmediateFlush {
flushSeconds = 86400 // Set a very long flush interval if we're flushing directly
}
if w.Url != "" {
w.Log.Debug("connecting over http/https using Url: %s", w.Url)
sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{
Server: w.Url,
Token: w.Token,
FlushIntervalSeconds: 5,
FlushIntervalSeconds: flushSeconds,
})
if err != nil {
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Url: %s", w.Url)
Expand All @@ -139,7 +150,7 @@ func (w *Wavefront) Connect() error {
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{
Host: w.Host,
MetricsPort: w.Port,
FlushIntervalSeconds: 5,
FlushIntervalSeconds: flushSeconds,
})
if err != nil {
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
Expand All @@ -166,6 +177,10 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
}
}
}
if w.ImmediateFlush {
w.Log.Debugf("Flushing batch of %d points", len(metrics))
return w.sender.Flush()
}
return nil
}

Expand Down Expand Up @@ -336,6 +351,7 @@ func init() {
ConvertPaths: true,
ConvertBool: true,
TruncateTags: false,
ImmediateFlush: true,
}
})
}