diff --git a/docker-compose.yml b/docker-compose.yml index a5991434bc16e..bce3f4922306b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,12 +24,13 @@ services: depends_on: - zookeeper elasticsearch: - image: elasticsearch:5 + image: docker.elastic.co/elasticsearch/elasticsearch:7.2.0 environment: - - JAVA_OPTS="-Xms256m -Xmx256m" + - "ES_JAVA_OPTS=-Xms256m -Xmx256m" + - discovery.type=single-node + - xpack.security.enabled=false ports: - "9200:9200" - - "9300:9300" mysql: image: mysql environment: diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md index 11f3c1385fd5c..2ba46c87ef443 100644 --- a/plugins/outputs/elasticsearch/README.md +++ b/plugins/outputs/elasticsearch/README.md @@ -1,8 +1,8 @@ -## Elasticsearch Output Plugin for Telegraf +# Elasticsearch Output Plugin for Telegraf -This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic (http://olivere.github.io/elastic/). +This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic ( -Currently it only supports Elasticsearch 5.x series. +It supports Elasticsearch releases from 5.x up to 7.x. ## Elasticsearch indexes and templates @@ -22,7 +22,7 @@ For more information on how this works, see https://www.elastic.co/guide/en/elas This plugin can create a working template for use with telegraf metrics. It uses Elasticsearch dynamic templates feature to set proper types for the tags and metrics fields. If the template specified already exists, it will not overwrite unless you configure this plugin to do so. Thus you can customize this template after its creation if necessary. -Example of an index template created by telegraf: +Example of an index template created by telegraf on Elasticsearch 5.x: ```json { @@ -35,6 +35,8 @@ Example of an index template created by telegraf: "limit": "5000" } }, + "auto_expand_replicas" : "0-1", + "codec" : "best_compression", "refresh_interval": "10s" } }, @@ -159,7 +161,7 @@ This plugin will format the events in the following way: ## Set the interval to check if the Elasticsearch nodes are available ## Setting to "0s" will disable the health check (not recommended in production) health_check_interval = "10s" - ## HTTP basic authentication details (eg. when using Shield) + ## HTTP basic authentication details # username = "telegraf" # password = "mypassword" @@ -209,6 +211,7 @@ This plugin will format the events in the following way: %H - hour (00..23) %V - week of the year (ISO week) (01..53) ``` + Additionally, you can specify dynamic index names by using tags with the notation ```{{tag_name}}```. This will store the metrics with different tag values in different indices. If the tag does not exist in a particular metric, the `default_tag_value` will be used instead. ### Optional parameters: diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index 56169135ac3be..7c4d4755a19d5 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -1,12 +1,14 @@ package elasticsearch import ( + "bytes" "context" "fmt" "log" "net/http" "strconv" "strings" + "text/template" "time" "github.com/influxdata/telegraf" @@ -29,6 +31,7 @@ type Elasticsearch struct { ManageTemplate bool TemplateName string OverwriteTemplate bool + MajorReleaseNumber int tls.ClientConfig Client *elastic.Client @@ -47,7 +50,7 @@ var sampleConfig = ` ## Set the interval to check if the Elasticsearch nodes are available ## Setting to "0s" will disable the health check (not recommended in production) health_check_interval = "10s" - ## HTTP basic authentication details (eg. when using Shield) + ## HTTP basic authentication details # username = "telegraf" # password = "mypassword" @@ -85,6 +88,81 @@ var sampleConfig = ` overwrite_template = false ` +const telegrafTemplate = ` +{ + {{ if (lt .Version 6) }} + "template": "{{.TemplatePattern}}", + {{ else }} + "index_patterns" : [ "{{.TemplatePattern}}" ], + {{ end }} + "settings": { + "index": { + "refresh_interval": "10s", + "mapping.total_fields.limit": 5000, + "auto_expand_replicas" : "0-1", + "codec" : "best_compression" + } + }, + "mappings" : { + {{ if (lt .Version 7) }} + "metrics" : { + {{ if (lt .Version 6) }} + "_all": { "enabled": false }, + {{ end }} + {{ end }} + "properties" : { + "@timestamp" : { "type" : "date" }, + "measurement_name" : { "type" : "keyword" } + }, + "dynamic_templates": [ + { + "tags": { + "match_mapping_type": "string", + "path_match": "tag.*", + "mapping": { + "ignore_above": 512, + "type": "keyword" + } + } + }, + { + "metrics_long": { + "match_mapping_type": "long", + "mapping": { + "type": "float", + "index": false + } + } + }, + { + "metrics_double": { + "match_mapping_type": "double", + "mapping": { + "type": "float", + "index": false + } + } + }, + { + "text_fields": { + "match": "*", + "mapping": { + "norms": false + } + } + } + ] + {{ if (lt .Version 7) }} + } + {{ end }} + } +}` + +type templatePart struct { + TemplatePattern string + Version int +} + func (a *Elasticsearch) Connect() error { if a.URLs == nil || a.IndexName == "" { return fmt.Errorf("Elasticsearch urls or index_name is not defined") @@ -142,14 +220,15 @@ func (a *Elasticsearch) Connect() error { } // quit if ES version is not supported - i, err := strconv.Atoi(strings.Split(esVersion, ".")[0]) - if err != nil || i < 5 { + majorReleaseNumber, err := strconv.Atoi(strings.Split(esVersion, ".")[0]) + if err != nil || majorReleaseNumber < 5 { return fmt.Errorf("Elasticsearch version not supported: %s", esVersion) } log.Println("I! Elasticsearch version: " + esVersion) a.Client = client + a.MajorReleaseNumber = majorReleaseNumber if a.ManageTemplate { err := a.manageTemplate(ctx) @@ -184,10 +263,13 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { m["tag"] = metric.Tags() m[name] = metric.Fields() - bulkRequest.Add(elastic.NewBulkIndexRequest(). - Index(indexName). - Type("metrics"). - Doc(m)) + br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m) + + if a.MajorReleaseNumber <= 6 { + br.Type("metrics") + } + + bulkRequest.Add(br) } @@ -237,65 +319,16 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error { } if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") { - // Create or update the template - tmpl := fmt.Sprintf(` - { - "template":"%s", - "settings": { - "index": { - "refresh_interval": "10s", - "mapping.total_fields.limit": 5000 - } - }, - "mappings" : { - "_default_" : { - "_all": { "enabled": false }, - "properties" : { - "@timestamp" : { "type" : "date" }, - "measurement_name" : { "type" : "keyword" } - }, - "dynamic_templates": [ - { - "tags": { - "match_mapping_type": "string", - "path_match": "tag.*", - "mapping": { - "ignore_above": 512, - "type": "keyword" - } - } - }, - { - "metrics_long": { - "match_mapping_type": "long", - "mapping": { - "type": "float", - "index": false - } - } - }, - { - "metrics_double": { - "match_mapping_type": "double", - "mapping": { - "type": "float", - "index": false - } - } - }, - { - "text_fields": { - "match": "*", - "mapping": { - "norms": false - } - } - } - ] - } - } - }`, templatePattern+"*") - _, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx) + tp := templatePart{ + TemplatePattern: templatePattern + "*", + Version: a.MajorReleaseNumber, + } + + t := template.Must(template.New("template").Parse(telegrafTemplate)) + var tmpl bytes.Buffer + + t.Execute(&tmpl, tp) + _, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx) if errCreateTemplate != nil { return fmt.Errorf("Elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate)