Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES output: allow to use tag values on index name #3470

Merged
merged 8 commits into from
Nov 21, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@ package elasticsearch
import (
"context"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"gopkg.in/olivere/elastic.v5"
"log"
"net/http"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"gopkg.in/olivere/elastic.v5"
)

type Elasticsearch struct {
URLs []string `toml:"urls"`
IndexName string
DefaultTagValue string
Username string
Password string
EnableSniffer bool
Expand All @@ -37,7 +39,7 @@ var sampleConfig = `
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
urls = [ "http://node1.es.example.com:9200" ] # required.
## Elasticsearch client timeout, defaults to "5s" if not set.
## Elasticsearch client timeout, defaults to "5s" if not set.
timeout = "5s"
## Set to true to ask Elasticsearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option.
Expand All @@ -58,6 +60,11 @@ var sampleConfig = `
# %m - month (01..12)
# %d - day of month (e.g., 01)
# %H - hour (00..23)
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the index name. If the tag does not exist,
## the default tag value will be used.
# index_name = "telegraf-{{host}}-%Y.%m.%d"
# default_tag_value = "none"
index_name = "telegraf-%Y.%m.%d" # required.

## Optional SSL Config
Expand Down Expand Up @@ -165,7 +172,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {

// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time())
indexName := a.GetIndexName(a.IndexName, metric.Time(), metric.Tags())

m := make(map[string]interface{})

Expand Down Expand Up @@ -212,13 +219,21 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
}

templatePattern := a.IndexName + "*"
templatePattern := a.IndexName

if strings.Contains(templatePattern, "%") {
templatePattern = templatePattern[0:strings.Index(templatePattern, "%")]
}

if strings.Contains(a.IndexName, "%") {
templatePattern = a.IndexName[0:strings.Index(a.IndexName, "%")] + "*"
if strings.Contains(templatePattern, "{{") {
templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")]
}

if (a.OverwriteTemplate) || (!templateExists) {
if templatePattern == "" {
return fmt.Errorf("Template cannot be created for dynamic index names without an index prefix")
}

if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
// Create or update the template
tmpl := fmt.Sprintf(`
{
Expand Down Expand Up @@ -276,7 +291,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
]
}
}
}`, templatePattern)
}`, templatePattern+"*")
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx)

if errCreateTemplate != nil {
Expand All @@ -293,7 +308,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
return nil
}

func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string {
func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, metricTags map[string]string) string {
if strings.Contains(indexName, "%") {
var dateReplacer = strings.NewReplacer(
"%Y", eventTime.UTC().Format("2006"),
Expand All @@ -306,6 +321,45 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) stri
indexName = dateReplacer.Replace(indexName)
}

startTag := strings.Index(indexName, "{{")

for startTag >= 0 {

endTag := strings.Index(indexName, "}}")

if endTag < 0 {

startTag = -1

} else {

tagName := indexName[startTag+2 : endTag]
tagNameTrim := strings.TrimSpace(tagName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we trim the space here, then I think the tag may not match further down in the replacer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why I kept the the tagName variable to be used by the replacer instead of trimming on it directly

found := false

for k := range metricTags {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this happens for each metric, I'm concerned about the performance. Perhaps this can be improved with by doing some of the work at Connect time.

I'll try to explain in more detail, this part needs only done once:

  • Make an array of tagkeys: ["host"]
  • Build a format string from the indexName template with the tags replaced with %s: telegraf-%s-%Y.%m.%d

Then at Write time:

  • Lookup the tag values and place them into an array.
  • Use fmt.Sprintf with the format string and the array of tag values
  • Use the existing replacer for the dates.

if k == tagNameTrim {
found = true
var tagReplacer = strings.NewReplacer(
"{{"+tagName+"}}", metricTags[k],
)
indexName = tagReplacer.Replace(indexName)
}
}

if found != true {
log.Printf("D! Tag %s not found, using '%s' instead\n", tagNameTrim, a.DefaultTagValue)
var tagReplacer = strings.NewReplacer(
"{{"+tagName+"}}", a.DefaultTagValue,
)
indexName = tagReplacer.Replace(indexName)

}

startTag = strings.Index(indexName, "{{")
}
}

return indexName

}
Expand Down
63 changes: 60 additions & 3 deletions plugins/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func TestConnectAndWrite(t *testing.T) {
}

func TestTemplateManagementEmptyTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}

ctx := context.Background()
Expand Down Expand Up @@ -82,49 +86,102 @@ func TestTemplateManagement(t *testing.T) {
require.NoError(t, err)
}

func TestTemplateInvalidIndexPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}

e := &Elasticsearch{
URLs: urls,
IndexName: "{{host}}-%Y.%m.%d",
Timeout: internal.Duration{Duration: time.Second * 5},
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
}

err := e.Connect()
require.Error(t, err)
}

func TestGetIndexName(t *testing.T) {
e := &Elasticsearch{}
e := &Elasticsearch{
DefaultTagValue: "none",
}

var tests = []struct {
EventTime time.Time
Tags map[string]string
IndexName string
Expected string
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname",
"indexname",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-%Y",
"indexname-2014",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-%Y-%m",
"indexname-2014-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-%Y-%m-%d",
"indexname-2014-12-01",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-%Y-%m-%d-%H",
"indexname-2014-12-01-23",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-%y-%m",
"indexname-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-{{tag1}}-%y-%m",
"indexname-value1-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-{{tag1}}-{{tag2}}-%y-%m",
"indexname-value1-value2-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-{{tag1}}-{{ tag2 }}-{{tag3}}-%y-%m",
"indexname-value1-value2-none-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
"indexname-{{tag1}}-{{tag2}}-{{tag3}-%y-%m",
"indexname-value1-value2-{{tag3}-14-12",
},
}
for _, test := range tests {
indexName := e.GetIndexName(test.IndexName, test.EventTime)
indexName := e.GetIndexName(test.IndexName, test.EventTime, test.Tags)
if indexName != test.Expected {
t.Errorf("Expected indexname %s, got %s\n", indexName, test.Expected)
t.Errorf("Expected indexname %s, got %s\n", test.Expected, indexName)
}
}
}