From 763164145f7e89b96776ca0c134415bb832e6654 Mon Sep 17 00:00:00 2001 From: Zachary Priddy Date: Thu, 17 Feb 2022 15:00:26 -0800 Subject: [PATCH] feat[elastic output]: add elastic pipeline flags (#10505) --- plugins/outputs/elasticsearch/README.md | 12 ++ .../outputs/elasticsearch/elasticsearch.go | 81 +++++++--- .../elasticsearch/elasticsearch_test.go | 143 ++++++++++++++++++ 3 files changed, 215 insertions(+), 21 deletions(-) diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md index 9dc41fb39451a..7579c87c0c47c 100644 --- a/plugins/outputs/elasticsearch/README.md +++ b/plugins/outputs/elasticsearch/README.md @@ -247,6 +247,16 @@ POST https://es.us-east-1.amazonaws.com/2021-01-01/opensearch/upgradeDomain ## NaNs and inf will be replaced with the given number, -inf with the negative of that number # float_handling = "none" # float_replacement_value = 0.0 + + ## Pipeline Config + ## To use a ingest pipeline, set this to the name of the pipeline you want to use. + # use_pipeline = "my_pipeline" + ## Additionally, you can specify a tag name using the notation {{tag_name}} + ## which will be used as part of the pipeline name. If the tag does not exist, + ## the default pipeline will be used as the pipeline. If no default pipeline is set, + ## no pipeline is used for the metric. + # use_pipeline = "{{es_pipeline}}" + # default_pipeline = "my_pipeline" ``` ### Permissions @@ -286,6 +296,8 @@ Additionally, you can specify dynamic index names by using tags with the notatio * `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents. * `float_handling`: Specifies how to handle `NaN` and infinite field values. `"none"` (default) will do nothing, `"drop"` will drop the field and `replace` will replace the field value by the number in `float_replacement_value` * `float_replacement_value`: Value (defaulting to `0.0`) to replace `NaN`s and `inf`s if `float_handling` is set to `replace`. Negative `inf` will be replaced by the negative value in this number to respect the sign of the field's original value. +* `use_pipeline`: If set, the set value will be used as the pipeline to call when sending events to elasticsearch. Additionally, you can specify dynamic pipeline names by using tags with the notation ```{{tag_name}}```. If the tag does not exist in a particular metric, the `default_pipeline` will be used instead. +* `default_pipeline`: If dynamic pipeline names the tag does not exist in a particular metric, this value will be used instead. ## Known issues diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index 7f6066b05ecdb..2eb21f092591d 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -23,25 +23,29 @@ import ( ) type Elasticsearch struct { - URLs []string `toml:"urls"` - IndexName string - DefaultTagValue string - TagKeys []string - Username string - Password string - AuthBearerToken string - EnableSniffer bool - Timeout config.Duration - HealthCheckInterval config.Duration - EnableGzip bool - ManageTemplate bool - TemplateName string - OverwriteTemplate bool - ForceDocumentID bool `toml:"force_document_id"` - MajorReleaseNumber int + AuthBearerToken string `toml:"auth_bearer_token"` + DefaultPipeline string `toml:"default_pipeline"` + DefaultTagValue string `toml:"default_tag_value"` + EnableGzip bool `toml:"enable_gzip"` + EnableSniffer bool `toml:"enable_sniffer"` FloatHandling string `toml:"float_handling"` FloatReplacement float64 `toml:"float_replacement_value"` + ForceDocumentID bool `toml:"force_document_id"` + HealthCheckInterval config.Duration `toml:"health_check_interval"` + IndexName string `toml:"index_name"` + ManageTemplate bool `toml:"manage_template"` + OverwriteTemplate bool `toml:"overwrite_template"` + Password string `toml:"password"` + TemplateName string `toml:"template_name"` + Timeout config.Duration `toml:"timeout"` + URLs []string `toml:"urls"` + UsePipeline string `toml:"use_pipeline"` + Username string `toml:"username"` Log telegraf.Logger `toml:"-"` + majorReleaseNumber int + pipelineName string + pipelineTagKeys []string + tagKeys []string tls.ClientConfig Client *elastic.Client @@ -112,6 +116,16 @@ var sampleConfig = ` ## NaNs and inf will be replaced with the given number, -inf with the negative of that number # float_handling = "none" # float_replacement_value = 0.0 + + ## Pipeline Config + ## To use a ingest pipeline, set this to the name of the pipeline you want to use. + # use_pipeline = "my_pipeline" + ## Additionally, you can specify a tag name using the notation {{tag_name}} + ## which will be used as part of the pipeline name. If the tag does not exist, + ## the default pipeline will be used as the pipeline. If no default pipeline is set, + ## no pipeline is used for the metric. + # use_pipeline = "{{es_pipeline}}" + # default_pipeline = "my_pipeline" ` const telegrafTemplate = ` @@ -278,7 +292,7 @@ func (a *Elasticsearch) Connect() error { a.Log.Infof("Elasticsearch version: %q", esVersion) a.Client = client - a.MajorReleaseNumber = majorReleaseNumber + a.majorReleaseNumber = majorReleaseNumber if a.ManageTemplate { err := a.manageTemplate(ctx) @@ -287,7 +301,8 @@ func (a *Elasticsearch) Connect() error { } } - a.IndexName, a.TagKeys = a.GetTagKeys(a.IndexName) + a.IndexName, a.tagKeys = a.GetTagKeys(a.IndexName) + a.pipelineName, a.pipelineTagKeys = a.GetTagKeys(a.UsePipeline) return nil } @@ -316,7 +331,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { // index name has to be re-evaluated each time for telegraf // to send the metric to the correct time-based index - indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags()) + indexName := a.GetIndexName(a.IndexName, metric.Time(), a.tagKeys, metric.Tags()) // Handle NaN and inf field-values fields := make(map[string]interface{}) @@ -351,10 +366,16 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { br.Id(id) } - if a.MajorReleaseNumber <= 6 { + if a.majorReleaseNumber <= 6 { br.Type("metrics") } + if a.UsePipeline != "" { + if pipelineName := a.getPipelineName(a.pipelineName, a.pipelineTagKeys, metric.Tags()); pipelineName != "" { + br.Pipeline(pipelineName) + } + } + bulkRequest.Add(br) } @@ -406,7 +427,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error { if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") { tp := templatePart{ TemplatePattern: templatePattern + "*", - Version: a.MajorReleaseNumber, + Version: a.majorReleaseNumber, } t := template.Must(template.New("template").Parse(telegrafTemplate)) @@ -482,6 +503,24 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagK return fmt.Sprintf(indexName, tagValues...) } +func (a *Elasticsearch) getPipelineName(pipelineInput string, tagKeys []string, metricTags map[string]string) string { + if !strings.Contains(pipelineInput, "%") || len(tagKeys) == 0 { + return pipelineInput + } + + var tagValues []interface{} + + for _, key := range tagKeys { + if value, ok := metricTags[key]; ok { + tagValues = append(tagValues, value) + continue + } + a.Log.Debugf("Tag %s not found, reverting to default pipeline instead.", key) + return a.DefaultPipeline + } + return fmt.Sprintf(pipelineInput, tagValues...) +} + func getISOWeek(eventTime time.Time) string { _, week := eventTime.ISOWeek() return strconv.Itoa(week) diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index 5cb2ecb2a73b9..e8c67124ee678 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -412,6 +412,149 @@ func TestGetIndexName(t *testing.T) { } } +func TestGetPipelineName(t *testing.T) { + e := &Elasticsearch{ + UsePipeline: "{{es-pipeline}}", + DefaultPipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + } + e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline) + + tests := []struct { + EventTime time.Time + Tags map[string]string + PipelineTagKeys []string + Expected string + }{ + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "myDefaultPipeline", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "myDefaultPipeline", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"}, + []string{}, + "myOtherPipeline", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"}, + []string{}, + "pipeline2", + }, + } + for _, test := range tests { + pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags) + require.Equal(t, test.Expected, pipelineName) + } + + // Setup testing for testing no pipeline set. All the tests in this case should return "". + e = &Elasticsearch{ + Log: testutil.Logger{}, + } + e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline) + + for _, test := range tests { + pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags) + require.Equal(t, "", pipelineName) + } +} + +func TestPipelineConfigs(t *testing.T) { + tests := []struct { + EventTime time.Time + Tags map[string]string + PipelineTagKeys []string + Expected string + Elastic *Elasticsearch + }{ + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "", + &Elasticsearch{ + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + "", + &Elasticsearch{ + DefaultPipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"}, + []string{}, + "myDefaultPipeline", + &Elasticsearch{ + UsePipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"}, + []string{}, + "", + &Elasticsearch{ + DefaultPipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"}, + []string{}, + "pipeline2", + &Elasticsearch{ + UsePipeline: "{{es-pipeline}}", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"}, + []string{}, + "value1-pipeline2", + &Elasticsearch{ + UsePipeline: "{{tag1}}-{{es-pipeline}}", + Log: testutil.Logger{}, + }, + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1"}, + []string{}, + "", + &Elasticsearch{ + UsePipeline: "{{es-pipeline}}", + Log: testutil.Logger{}, + }, + }, + } + + for _, test := range tests { + e := test.Elastic + e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline) + pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags) + require.Equal(t, test.Expected, pipelineName) + } +} + func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path {