From 920a51a8f98e5f763bde351575d9b7d44d48e73e Mon Sep 17 00:00:00 2001 From: Sebastian Spaink <3441183+sspaink@users.noreply.github.com> Date: Fri, 18 Dec 2020 15:41:39 -0600 Subject: [PATCH] Allow globs (wildcards) in config for tags/fields in enum processor (#8598) * Allow glob in enum processor config * change assert to require (cherry picked from commit 50265d9023e3104c3656ec9412f93b800fb9c8c4) --- plugins/processors/enum/README.md | 4 +- plugins/processors/enum/enum.go | 98 ++++++++++++++++++++-------- plugins/processors/enum/enum_test.go | 61 +++++++++++++---- 3 files changed, 122 insertions(+), 41 deletions(-) diff --git a/plugins/processors/enum/README.md b/plugins/processors/enum/README.md index 72a0556252902..651e58e7d2fce 100644 --- a/plugins/processors/enum/README.md +++ b/plugins/processors/enum/README.md @@ -14,10 +14,10 @@ source tag or field is overwritten. ```toml [[processors.enum]] [[processors.enum.mapping]] - ## Name of the field to map + ## Name of the field to map. Globs accepted. field = "status" - ## Name of the tag to map + ## Name of the tag to map. Globs accepted. # tag = "status" ## Destination tag or field to be used for the mapped value. By default the diff --git a/plugins/processors/enum/enum.go b/plugins/processors/enum/enum.go index a96e7d5095bcf..60a4264528844 100644 --- a/plugins/processors/enum/enum.go +++ b/plugins/processors/enum/enum.go @@ -5,15 +5,16 @@ import ( "strconv" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/plugins/processors" ) var sampleConfig = ` [[processors.enum.mapping]] - ## Name of the field to map + ## Name of the field to map. Globs accepted. field = "status" - ## Name of the tag to map + ## Name of the tag to map. Globs accepted. # tag = "status" ## Destination tag or field to be used for the mapped value. By default the @@ -34,6 +35,9 @@ var sampleConfig = ` type EnumMapper struct { Mappings []Mapping `toml:"mapping"` + + FieldFilters map[string]filter.Filter + TagFilters map[string]filter.Filter } type Mapping struct { @@ -44,6 +48,29 @@ type Mapping struct { ValueMappings map[string]interface{} } +func (mapper *EnumMapper) Init() error { + mapper.FieldFilters = make(map[string]filter.Filter) + mapper.TagFilters = make(map[string]filter.Filter) + for _, mapping := range mapper.Mappings { + if mapping.Field != "" { + fieldFilter, err := filter.NewIncludeExcludeFilter([]string{mapping.Field}, nil) + if err != nil { + return fmt.Errorf("Failed to create new field filter: %w", err) + } + mapper.FieldFilters[mapping.Field] = fieldFilter + } + if mapping.Tag != "" { + tagFilter, err := filter.NewIncludeExcludeFilter([]string{mapping.Tag}, nil) + if err != nil { + return fmt.Errorf("Failed to create new tag filter: %s", err) + } + mapper.TagFilters[mapping.Tag] = tagFilter + } + } + + return nil +} + func (mapper *EnumMapper) SampleConfig() string { return sampleConfig } @@ -60,30 +87,56 @@ func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric { } func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric { + newFields := make(map[string]interface{}) + newTags := make(map[string]string) + for _, mapping := range mapper.Mappings { if mapping.Field != "" { - if originalValue, isPresent := metric.GetField(mapping.Field); isPresent { - if adjustedValue, isString := adjustValue(originalValue).(string); isString { - if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent { - writeField(metric, mapping.getDestination(), mappedValue) - } + mapper.fieldMapping(metric, mapping, newFields) + } + if mapping.Tag != "" { + mapper.tagMapping(metric, mapping, newTags) + } + } + + for k, v := range newFields { + writeField(metric, k, v) + } + + for k, v := range newTags { + writeTag(metric, k, v) + } + + return metric +} + +func (mapper *EnumMapper) fieldMapping(metric telegraf.Metric, mapping Mapping, newFields map[string]interface{}) { + fields := metric.FieldList() + for _, f := range fields { + if mapper.FieldFilters[mapping.Field].Match(f.Key) { + if adjustedValue, isString := adjustValue(f.Value).(string); isString { + if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent { + newFields[mapping.getDestination(f.Key)] = mappedValue } } } - if mapping.Tag != "" { - if originalValue, isPresent := metric.GetTag(mapping.Tag); isPresent { - if mappedValue, isMappedValuePresent := mapping.mapValue(originalValue); isMappedValuePresent { - switch val := mappedValue.(type) { - case string: - writeTag(metric, mapping.getDestinationTag(), val) - default: - writeTag(metric, mapping.getDestinationTag(), fmt.Sprintf("%v", val)) - } + } +} + +func (mapper *EnumMapper) tagMapping(metric telegraf.Metric, mapping Mapping, newTags map[string]string) { + tags := metric.TagList() + for _, t := range tags { + if mapper.TagFilters[mapping.Tag].Match(t.Key) { + if mappedValue, isMappedValuePresent := mapping.mapValue(t.Value); isMappedValuePresent { + switch val := mappedValue.(type) { + case string: + newTags[mapping.getDestination(t.Key)] = val + default: + newTags[mapping.getDestination(t.Key)] = fmt.Sprintf("%v", val) } } } } - return metric } func adjustValue(in interface{}) interface{} { @@ -109,18 +162,11 @@ func (mapping *Mapping) mapValue(original string) (interface{}, bool) { return original, false } -func (mapping *Mapping) getDestination() string { - if mapping.Dest != "" { - return mapping.Dest - } - return mapping.Field -} - -func (mapping *Mapping) getDestinationTag() string { +func (mapping *Mapping) getDestination(defaultDest string) string { if mapping.Dest != "" { return mapping.Dest } - return mapping.Tag + return defaultDest } func writeField(metric telegraf.Metric, name string, value interface{}) { diff --git a/plugins/processors/enum/enum_test.go b/plugins/processors/enum/enum_test.go index de13aad156f5c..21b89d241a2a2 100644 --- a/plugins/processors/enum/enum_test.go +++ b/plugins/processors/enum/enum_test.go @@ -7,17 +7,22 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func createTestMetric() telegraf.Metric { metric, _ := metric.New("m1", - map[string]string{"tag": "tag_value"}, + map[string]string{ + "tag": "tag_value", + "duplicate_tag": "tag_value", + }, map[string]interface{}{ - "string_value": "test", - "int_value": int(200), - "uint_value": uint(500), - "float_value": float64(3.14), - "true_value": true, + "string_value": "test", + "duplicate_string_value": "test", + "int_value": int(200), + "uint_value": uint(500), + "float_value": float64(3.14), + "true_value": true, }, time.Now(), ) @@ -48,6 +53,8 @@ func assertTagValue(t *testing.T, expected interface{}, tag string, tags map[str func TestRetainsMetric(t *testing.T) { mapper := EnumMapper{} + err := mapper.Init() + require.Nil(t, err) source := createTestMetric() target := mapper.Apply(source)[0] @@ -64,7 +71,8 @@ func TestRetainsMetric(t *testing.T) { func TestMapsSingleStringValueTag(t *testing.T) { mapper := EnumMapper{Mappings: []Mapping{{Tag: "tag", ValueMappings: map[string]interface{}{"tag_value": "valuable"}}}} - + err := mapper.Init() + require.Nil(t, err) tags := calculateProcessedTags(mapper, createTestMetric()) assertTagValue(t, "valuable", "tag", tags) @@ -72,7 +80,8 @@ func TestMapsSingleStringValueTag(t *testing.T) { func TestNoFailureOnMappingsOnNonSupportedValuedFields(t *testing.T) { mapper := EnumMapper{Mappings: []Mapping{{Field: "float_value", ValueMappings: map[string]interface{}{"3.14": "pi"}}}} - + err := mapper.Init() + require.Nil(t, err) fields := calculateProcessedValues(mapper, createTestMetric()) assertFieldValue(t, float64(3.14), "float_value", fields) @@ -110,6 +119,8 @@ func TestMappings(t *testing.T) { field_name := mapping["field_name"][0].(string) for index := range mapping["target_value"] { mapper := EnumMapper{Mappings: []Mapping{{Field: field_name, ValueMappings: map[string]interface{}{mapping["target_value"][index].(string): mapping["mapped_value"][index]}}}} + err := mapper.Init() + assert.Nil(t, err) fields := calculateProcessedValues(mapper, createTestMetric()) assertFieldValue(t, mapping["expected_value"][index], field_name, fields) } @@ -118,7 +129,8 @@ func TestMappings(t *testing.T) { func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) { mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}} - + err := mapper.Init() + require.Nil(t, err) fields := calculateProcessedValues(mapper, createTestMetric()) assertFieldValue(t, 42, "string_value", fields) @@ -126,7 +138,8 @@ func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) { func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) { mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}} - + err := mapper.Init() + require.Nil(t, err) fields := calculateProcessedValues(mapper, createTestMetric()) assertFieldValue(t, 1, "string_value", fields) @@ -134,7 +147,8 @@ func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) { func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) { mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}} - + err := mapper.Init() + require.Nil(t, err) fields := calculateProcessedValues(mapper, createTestMetric()) assertFieldValue(t, "test", "string_value", fields) @@ -142,7 +156,8 @@ func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) { func TestWritesToDestination(t *testing.T) { mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}} - + err := mapper.Init() + require.Nil(t, err) fields := calculateProcessedValues(mapper, createTestMetric()) assertFieldValue(t, "test", "string_value", fields) @@ -152,10 +167,30 @@ func TestWritesToDestination(t *testing.T) { func TestDoNotWriteToDestinationWithoutDefaultOrDefinedMapping(t *testing.T) { field := "string_code" mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: field, ValueMappings: map[string]interface{}{"other": int64(1)}}}} - + err := mapper.Init() + require.Nil(t, err) fields := calculateProcessedValues(mapper, createTestMetric()) assertFieldValue(t, "test", "string_value", fields) _, present := fields[field] assert.False(t, present, "value of field '"+field+"' was present") } + +func TestFieldGlobMatching(t *testing.T) { + mapper := EnumMapper{Mappings: []Mapping{{Field: "*", ValueMappings: map[string]interface{}{"test": "glob"}}}} + err := mapper.Init() + require.Nil(t, err) + fields := calculateProcessedValues(mapper, createTestMetric()) + + assertFieldValue(t, "glob", "string_value", fields) + assertFieldValue(t, "glob", "duplicate_string_value", fields) +} + +func TestTagGlobMatching(t *testing.T) { + mapper := EnumMapper{Mappings: []Mapping{{Tag: "*", ValueMappings: map[string]interface{}{"tag_value": "glob"}}}} + err := mapper.Init() + require.Nil(t, err) + tags := calculateProcessedTags(mapper, createTestMetric()) + + assertTagValue(t, "glob", "tag", tags) +}