Skip to content

Commit

Permalink
fix: apply requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
zpriddy committed Jan 28, 2022
1 parent 19871a9 commit 8f5dcd6
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 27 deletions.
38 changes: 15 additions & 23 deletions plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Elasticsearch struct {
URLs []string `toml:"urls"`
IndexName string `toml:"index_name"`
DefaultTagValue string `toml:"default_tag_value"`
TagKeys []string
tagKeys []string
Username string `toml:"username"`
Password string `toml:"password"`
AuthBearerToken string `toml:"auth_bearer_token"`
Expand All @@ -37,13 +37,13 @@ type Elasticsearch struct {
TemplateName string `toml:"template_name"`
OverwriteTemplate bool `toml:"overwrite_template"`
ForceDocumentID bool `toml:"force_document_id"`
MajorReleaseNumber int
majorReleaseNumber int
FloatHandling string `toml:"float_handling"`
FloatReplacement float64 `toml:"float_replacement_value"`
UsePipeline string `toml:"use_pipeline"`
DefaultPipeline string `toml:"default_pipeline"`
PipelineTagKeys []string
PipelineName string
pipelineTagKeys []string
pipelineName string
Log telegraf.Logger `toml:"-"`
tls.ClientConfig

Expand Down Expand Up @@ -285,7 +285,7 @@ func (a *Elasticsearch) Connect() error {
a.Log.Infof("Elasticsearch version: %q", esVersion)

a.Client = client
a.MajorReleaseNumber = majorReleaseNumber
a.majorReleaseNumber = majorReleaseNumber

if a.ManageTemplate {
err := a.manageTemplate(ctx)
Expand All @@ -294,8 +294,8 @@ func (a *Elasticsearch) Connect() error {
}
}

a.IndexName, a.TagKeys = a.GetTagKeys(a.IndexName)
a.PipelineName, a.PipelineTagKeys = a.GetTagKeys(a.UsePipeline)
a.IndexName, a.tagKeys = a.GetTagKeys(a.IndexName)
a.pipelineName, a.pipelineTagKeys = a.GetTagKeys(a.UsePipeline)

return nil
}
Expand Down Expand Up @@ -324,7 +324,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {

// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags())
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.tagKeys, metric.Tags())

// Handle NaN and inf field-values
fields := make(map[string]interface{})
Expand Down Expand Up @@ -359,12 +359,12 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
br.Id(id)
}

if a.MajorReleaseNumber <= 6 {
if a.majorReleaseNumber <= 6 {
br.Type("metrics")
}

if a.UsePipeline != "" {
if pipelineName := a.getPipelineName(a.PipelineName, a.PipelineTagKeys, metric.Tags()); pipelineName != "" {
if pipelineName := a.getPipelineName(a.pipelineName, a.pipelineTagKeys, metric.Tags()); pipelineName != "" {
br.Pipeline(pipelineName)
}
}
Expand Down Expand Up @@ -420,7 +420,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
tp := templatePart{
TemplatePattern: templatePattern + "*",
Version: a.MajorReleaseNumber,
Version: a.majorReleaseNumber,
}

t := template.Must(template.New("template").Parse(telegrafTemplate))
Expand Down Expand Up @@ -497,29 +497,21 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagK
}

func (a *Elasticsearch) getPipelineName(pipelineInput string, tagKeys []string, metricTags map[string]string) string {
if !strings.Contains(pipelineInput, "%") {
if !strings.Contains(pipelineInput, "%") || len(tagKeys) == 0 {
return pipelineInput
}

var (
tagValues []interface{}
useDefaultPipeline bool
)
var tagValues []interface{}

for _, key := range tagKeys {
if value, ok := metricTags[key]; ok {
tagValues = append(tagValues, value)
} else {
a.Log.Debugf("Tag %s not found, reverting to default pipeline instead.", key)
useDefaultPipeline = true
return a.DefaultPipeline
}
}

if len(tagValues) > 0 && !useDefaultPipeline {
return fmt.Sprintf(pipelineInput, tagValues...)
}

return a.DefaultPipeline
return fmt.Sprintf(pipelineInput, tagValues...)
}

func getISOWeek(eventTime time.Time) string {
Expand Down
95 changes: 91 additions & 4 deletions plugins/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func TestGetPipelineName(t *testing.T) {
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
}
e.PipelineName, e.PipelineTagKeys = e.GetTagKeys(e.UsePipeline)
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)

tests := []struct {
EventTime time.Time
Expand Down Expand Up @@ -452,22 +452,109 @@ func TestGetPipelineName(t *testing.T) {
},
}
for _, test := range tests {
pipelineName := e.getPipelineName(e.PipelineName, e.PipelineTagKeys, test.Tags)
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, test.Expected, pipelineName)
}

// Setup testing for testing no pipeline set. All the tests in this case should return "".
e = &Elasticsearch{
Log: testutil.Logger{},
}
e.PipelineName, e.PipelineTagKeys = e.GetTagKeys(e.UsePipeline)
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)

for _, test := range tests {
pipelineName := e.getPipelineName(e.PipelineName, e.PipelineTagKeys, test.Tags)
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, "", pipelineName)
}
}

func TestPipelineConfigs(t *testing.T) {
tests := []struct {
EventTime time.Time
Tags map[string]string
PipelineTagKeys []string
Expected string
Elastic *Elasticsearch
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"",
&Elasticsearch{
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"",
&Elasticsearch{
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"},
[]string{},
"myDefaultPipeline",
&Elasticsearch{
UsePipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"",
&Elasticsearch{
DefaultPipeline: "myDefaultPipeline",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"pipeline2",
&Elasticsearch{
UsePipeline: "{{es-pipeline}}",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "es-pipeline": "pipeline2"},
[]string{},
"value1-pipeline2",
&Elasticsearch{
UsePipeline: "{{tag1}}-{{es-pipeline}}",
Log: testutil.Logger{},
},
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1"},
[]string{},
"",
&Elasticsearch{
UsePipeline: "{{es-pipeline}}",
Log: testutil.Logger{},
},
},
}

for _, test := range tests {
e := test.Elastic
e.pipelineName, e.pipelineTagKeys = e.GetTagKeys(e.UsePipeline)
pipelineName := e.getPipelineName(e.pipelineName, e.pipelineTagKeys, test.Tags)
require.Equal(t, test.Expected, pipelineName)
}
}

func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
Expand Down

0 comments on commit 8f5dcd6

Please sign in to comment.