Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch output - Add support up to ES 7.x #6053

Merged
merged 2 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ services:
depends_on:
- zookeeper
elasticsearch:
image: elasticsearch:5
image: docker.elastic.co/elasticsearch/elasticsearch:7.2.0
environment:
- JAVA_OPTS="-Xms256m -Xmx256m"
- "ES_JAVA_OPTS=-Xms256m -Xmx256m"
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
- "9300:9300"
mysql:
image: mysql
environment:
Expand Down
13 changes: 8 additions & 5 deletions plugins/outputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## Elasticsearch Output Plugin for Telegraf
# Elasticsearch Output Plugin for Telegraf

This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic (http://olivere.github.io/elastic/).
This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic (<http://olivere.github.io/elastic/).>

Currently it only supports Elasticsearch 5.x series.
It supports Elasticsearch releases from 5.x up to 7.x.

## Elasticsearch indexes and templates

Expand All @@ -22,7 +22,7 @@ For more information on how this works, see https://www.elastic.co/guide/en/elas
This plugin can create a working template for use with telegraf metrics. It uses Elasticsearch dynamic templates feature to set proper types for the tags and metrics fields.
If the template specified already exists, it will not overwrite unless you configure this plugin to do so. Thus you can customize this template after its creation if necessary.

Example of an index template created by telegraf:
Example of an index template created by telegraf on Elasticsearch 5.x:

```json
{
Expand All @@ -35,6 +35,8 @@ Example of an index template created by telegraf:
"limit": "5000"
}
},
"auto_expand_replicas" : "0-1",
"codec" : "best_compression",
"refresh_interval": "10s"
}
},
Expand Down Expand Up @@ -159,7 +161,7 @@ This plugin will format the events in the following way:
## Set the interval to check if the Elasticsearch nodes are available
## Setting to "0s" will disable the health check (not recommended in production)
health_check_interval = "10s"
## HTTP basic authentication details (eg. when using Shield)
## HTTP basic authentication details
# username = "telegraf"
# password = "mypassword"

Expand Down Expand Up @@ -209,6 +211,7 @@ This plugin will format the events in the following way:
%H - hour (00..23)
%V - week of the year (ISO week) (01..53)
```

Additionally, you can specify dynamic index names by using tags with the notation ```{{tag_name}}```. This will store the metrics with different tag values in different indices. If the tag does not exist in a particular metric, the `default_tag_value` will be used instead.

### Optional parameters:
Expand Down
165 changes: 99 additions & 66 deletions plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package elasticsearch

import (
"bytes"
"context"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -29,6 +31,7 @@ type Elasticsearch struct {
ManageTemplate bool
TemplateName string
OverwriteTemplate bool
MajorReleaseNumber int
tls.ClientConfig

Client *elastic.Client
Expand All @@ -47,7 +50,7 @@ var sampleConfig = `
## Set the interval to check if the Elasticsearch nodes are available
## Setting to "0s" will disable the health check (not recommended in production)
health_check_interval = "10s"
## HTTP basic authentication details (eg. when using Shield)
## HTTP basic authentication details
# username = "telegraf"
# password = "mypassword"

Expand Down Expand Up @@ -85,6 +88,81 @@ var sampleConfig = `
overwrite_template = false
`

const telegrafTemplate = `
{
{{ if (lt .Version 6) }}
"template": "{{.TemplatePattern}}",
{{ else }}
"index_patterns" : [ "{{.TemplatePattern}}" ],
{{ end }}
"settings": {
"index": {
"refresh_interval": "10s",
"mapping.total_fields.limit": 5000,
"auto_expand_replicas" : "0-1",
"codec" : "best_compression"
}
},
"mappings" : {
{{ if (lt .Version 7) }}
"metrics" : {
{{ if (lt .Version 6) }}
"_all": { "enabled": false },
{{ end }}
{{ end }}
"properties" : {
"@timestamp" : { "type" : "date" },
"measurement_name" : { "type" : "keyword" }
},
"dynamic_templates": [
{
"tags": {
"match_mapping_type": "string",
"path_match": "tag.*",
"mapping": {
"ignore_above": 512,
"type": "keyword"
}
}
},
{
"metrics_long": {
"match_mapping_type": "long",
"mapping": {
"type": "float",
"index": false
}
}
},
{
"metrics_double": {
"match_mapping_type": "double",
"mapping": {
"type": "float",
"index": false
}
}
},
{
"text_fields": {
"match": "*",
"mapping": {
"norms": false
}
}
}
]
{{ if (lt .Version 7) }}
}
{{ end }}
}
}`

type templatePart struct {
TemplatePattern string
Version int
}

func (a *Elasticsearch) Connect() error {
if a.URLs == nil || a.IndexName == "" {
return fmt.Errorf("Elasticsearch urls or index_name is not defined")
Expand Down Expand Up @@ -142,14 +220,15 @@ func (a *Elasticsearch) Connect() error {
}

// quit if ES version is not supported
i, err := strconv.Atoi(strings.Split(esVersion, ".")[0])
if err != nil || i < 5 {
majorReleaseNumber, err := strconv.Atoi(strings.Split(esVersion, ".")[0])
if err != nil || majorReleaseNumber < 5 {
return fmt.Errorf("Elasticsearch version not supported: %s", esVersion)
}

log.Println("I! Elasticsearch version: " + esVersion)

a.Client = client
a.MajorReleaseNumber = majorReleaseNumber

if a.ManageTemplate {
err := a.manageTemplate(ctx)
Expand Down Expand Up @@ -184,10 +263,13 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
m["tag"] = metric.Tags()
m[name] = metric.Fields()

bulkRequest.Add(elastic.NewBulkIndexRequest().
Index(indexName).
Type("metrics").
Doc(m))
br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m)

if a.MajorReleaseNumber <= 6 {
br.Type("metrics")
}

bulkRequest.Add(br)

}

Expand Down Expand Up @@ -237,65 +319,16 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
}

if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
// Create or update the template
tmpl := fmt.Sprintf(`
{
"template":"%s",
"settings": {
"index": {
"refresh_interval": "10s",
"mapping.total_fields.limit": 5000
}
},
"mappings" : {
"_default_" : {
"_all": { "enabled": false },
"properties" : {
"@timestamp" : { "type" : "date" },
"measurement_name" : { "type" : "keyword" }
},
"dynamic_templates": [
{
"tags": {
"match_mapping_type": "string",
"path_match": "tag.*",
"mapping": {
"ignore_above": 512,
"type": "keyword"
}
}
},
{
"metrics_long": {
"match_mapping_type": "long",
"mapping": {
"type": "float",
"index": false
}
}
},
{
"metrics_double": {
"match_mapping_type": "double",
"mapping": {
"type": "float",
"index": false
}
}
},
{
"text_fields": {
"match": "*",
"mapping": {
"norms": false
}
}
}
]
}
}
}`, templatePattern+"*")
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx)
tp := templatePart{
TemplatePattern: templatePattern + "*",
Version: a.MajorReleaseNumber,
}

t := template.Must(template.New("template").Parse(telegrafTemplate))
var tmpl bytes.Buffer

t.Execute(&tmpl, tp)
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx)

if errCreateTemplate != nil {
return fmt.Errorf("Elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate)
Expand Down