Skip to content

Commit

Permalink
Add support for setting retention policy using tag (#7141)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Mar 10, 2020
1 parent c50b02e commit c7146be
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 82 deletions.
7 changes: 7 additions & 0 deletions plugins/outputs/influxdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser
## the default retention policy. Only takes effect when using HTTP.
# retention_policy = ""

## The value of this tag will be used to determine the retention policy. If this
## tag is not set the 'retention_policy' option is used as the default.
# retention_policy_tag = ""

## If true, the 'retention_policy_tag' will not be removed from the metric.
# exclude_retention_policy_tag = false

## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
## Only takes effect when using HTTP.
# write_consistency = "any"
Expand Down
109 changes: 61 additions & 48 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,23 @@ func (r WriteResponse) Error() string {
}

type HTTPConfig struct {
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
DatabaseTag string
ExcludeDatabaseTag bool
RetentionPolicy string
Consistency string
SkipDatabaseCreation bool
URL *url.URL
UserAgent string
Timeout time.Duration
Username string
Password string
TLSConfig *tls.Config
Proxy *url.URL
Headers map[string]string
ContentEncoding string
Database string
DatabaseTag string
ExcludeDatabaseTag bool
RetentionPolicy string
RetentionPolicyTag string
ExcludeRetentionPolicyTag bool
Consistency string
SkipDatabaseCreation bool

InfluxUintSupport bool `toml:"influx_uint_support"`
Serializer *influx.Serializer
Expand Down Expand Up @@ -236,55 +238,66 @@ func (c *httpClient) CreateDatabase(ctx context.Context, database string) error
}
}

type dbrp struct {
Database string
RetentionPolicy string
}

// Write sends the metrics to InfluxDB
func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
batches := make(map[string][]telegraf.Metric)
if c.config.DatabaseTag == "" {
err := c.writeBatch(ctx, c.config.Database, metrics)
if err != nil {
return err
// If these options are not used, we can skip in plugin batching and send
// the full batch in a single request.
if c.config.DatabaseTag == "" && c.config.RetentionPolicyTag == "" {
return c.writeBatch(ctx, c.config.Database, c.config.RetentionPolicy, metrics)
}

batches := make(map[dbrp][]telegraf.Metric)
for _, metric := range metrics {
db, ok := metric.GetTag(c.config.DatabaseTag)
if !ok {
db = c.config.Database
}
} else {
for _, metric := range metrics {
db, ok := metric.GetTag(c.config.DatabaseTag)
if !ok {
db = c.config.Database
}

if _, ok := batches[db]; !ok {
batches[db] = make([]telegraf.Metric, 0)
}
rp, ok := metric.GetTag(c.config.RetentionPolicyTag)
if !ok {
rp = c.config.RetentionPolicy
}

if c.config.ExcludeDatabaseTag {
// Avoid modifying the metric in case we need to retry the request.
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.config.DatabaseTag)
}
dbrp := dbrp{
Database: db,
RetentionPolicy: rp,
}

batches[db] = append(batches[db], metric)
if c.config.ExcludeDatabaseTag || c.config.ExcludeRetentionPolicyTag {
// Avoid modifying the metric in case we need to retry the request.
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.config.DatabaseTag)
metric.RemoveTag(c.config.RetentionPolicyTag)
}

for db, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[db] {
err := c.CreateDatabase(ctx, db)
if err != nil {
c.log.Warnf("When writing to [%s]: database %q creation failed: %v",
c.config.URL, db, err)
}
}
batches[dbrp] = append(batches[dbrp], metric)
}

err := c.writeBatch(ctx, db, batch)
for dbrp, batch := range batches {
if !c.config.SkipDatabaseCreation && !c.createdDatabases[dbrp.Database] {
err := c.CreateDatabase(ctx, dbrp.Database)
if err != nil {
return err
c.log.Warnf("When writing to [%s]: database %q creation failed: %v",
c.config.URL, dbrp.Database, err)
}
}

err := c.writeBatch(ctx, dbrp.Database, dbrp.RetentionPolicy, batch)
if err != nil {
return err
}
}
return nil
}

func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency)
func (c *httpClient) writeBatch(ctx context.Context, db, rp string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(c.config.URL, db, rp, c.config.Consistency)
if err != nil {
return err
}
Expand Down
197 changes: 197 additions & 0 deletions plugins/outputs/influxdb/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,3 +733,200 @@ func TestHTTP_WriteDatabaseTagWorksOnRetry(t *testing.T) {
err = client.Write(ctx, metrics)
require.NoError(t, err)
}

func TestDBRPTags(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

tests := []struct {
name string
config influxdb.HTTPConfig
metrics []telegraf.Metric
handlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
url string
}{
{
name: "defaults",
config: influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "static retention policy",
config: influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
RetentionPolicy: "foo",
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "foo")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "retention policy tag",
config: influxdb.HTTPConfig{
URL: u,
SkipDatabaseCreation: true,
Database: "telegraf",
RetentionPolicyTag: "rp",
Log: testutil.Logger{},
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"rp": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "foo")
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu,rp=foo value=42")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "retention policy tag fallback to static rp",
config: influxdb.HTTPConfig{
URL: u,
SkipDatabaseCreation: true,
Database: "telegraf",
RetentionPolicy: "foo",
RetentionPolicyTag: "rp",
Log: testutil.Logger{},
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "foo")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "retention policy tag fallback to unset rp",
config: influxdb.HTTPConfig{
URL: u,
SkipDatabaseCreation: true,
Database: "telegraf",
RetentionPolicyTag: "rp",
Log: testutil.Logger{},
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "")
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "exclude retention policy tag",
config: influxdb.HTTPConfig{
URL: u,
SkipDatabaseCreation: true,
Database: "telegraf",
RetentionPolicyTag: "rp",
ExcludeRetentionPolicyTag: true,
Log: testutil.Logger{},
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"rp": "foo",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
handlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.FormValue("db"), "telegraf")
require.Equal(t, r.FormValue("rp"), "foo")
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
require.Contains(t, string(body), "cpu value=42")
w.WriteHeader(http.StatusNoContent)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
tt.handlerFunc(t, w, r)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
})

client, err := influxdb.NewHTTPClient(tt.config)
require.NoError(t, err)

ctx := context.Background()
err = client.Write(ctx, tt.metrics)
require.NoError(t, err)
})
}
}
Loading

0 comments on commit c7146be

Please sign in to comment.