Skip to content

Commit

Permalink
Add option to save retention policy as tag in influxdb_listener (infl…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Apr 21, 2020
1 parent a11d3f5 commit 183715d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 9 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/influxdb_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ submits data to InfluxDB determines the destination database.
## the tag will be overwritten with the database supplied.
# database_tag = ""

## If set the retention policy specified in the write query will be added as
## the value of this tag name.
# retention_policy_tag = ""

## Optional username and password to accept for HTTP basic authentication.
## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar"
Expand Down
26 changes: 18 additions & 8 deletions plugins/inputs/influxdb_listener/influxdb_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ type InfluxDBListener struct {
port int
tlsint.ServerConfig

ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"` // deprecated in 1.14; ignored
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"`
ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"` // deprecated in 1.14; ignored
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"`
RetentionPolicyTag string `toml:"retention_policy_tag"`

timeFunc influx.TimeFunc

Expand Down Expand Up @@ -72,12 +73,16 @@ const sampleConfig = `
## 0 means to use the default of 32MiB.
max_body_size = "32MiB"
## Optional tag name used to store the database.
## Optional tag name used to store the database.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# database_tag = ""
## If set the retention policy specified in the write query will be added as
## the value of this tag name.
# retention_policy_tag = ""
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
Expand Down Expand Up @@ -255,6 +260,7 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc {
}

db := req.URL.Query().Get("db")
rp := req.URL.Query().Get("rp")

body := req.Body
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
Expand Down Expand Up @@ -316,6 +322,10 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc {
m.AddTag(h.DatabaseTag, db)
}

if h.RetentionPolicyTag != "" && rp != "" {
m.AddTag(h.RetentionPolicyTag, rp)
}

h.acc.AddMetric(m)

}
Expand Down
33 changes: 32 additions & 1 deletion plugins/inputs/influxdb_listener/influxdb_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -207,6 +207,37 @@ func TestWriteKeepDatabase(t *testing.T) {
}
}

func TestWriteRetentionPolicyTag(t *testing.T) {
listener := newTestListener()
listener.RetentionPolicyTag = "rp"

acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

resp, err := http.Post(createURL(listener, "http", "/write", "rp=myrp"), "", bytes.NewBuffer([]byte("cpu time_idle=42")))
require.NoError(t, err)
resp.Body.Close()
require.Equal(t, 204, resp.StatusCode)

expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"rp": "myrp",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
}

acc.Wait(1)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}

// http listener should add a newline at the end of the buffer if it's not there
func TestWriteNoNewline(t *testing.T) {
listener := newTestListener()
Expand Down

0 comments on commit 183715d

Please sign in to comment.