Skip to content

Commit

Permalink
Allow globs (wildcards) in config for tags/fields in enum processor (#…
Browse files Browse the repository at this point in the history
…8598)

* Allow glob in enum processor config

* change assert to require

(cherry picked from commit 50265d9)
  • Loading branch information
sspaink authored and ssoroka committed Dec 18, 2020
1 parent fa090ec commit 920a51a
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 41 deletions.
4 changes: 2 additions & 2 deletions plugins/processors/enum/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ source tag or field is overwritten.
```toml
[[processors.enum]]
[[processors.enum.mapping]]
## Name of the field to map
## Name of the field to map. Globs accepted.
field = "status"

## Name of the tag to map
## Name of the tag to map. Globs accepted.
# tag = "status"

## Destination tag or field to be used for the mapped value. By default the
Expand Down
98 changes: 72 additions & 26 deletions plugins/processors/enum/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"strconv"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/processors"
)

var sampleConfig = `
[[processors.enum.mapping]]
## Name of the field to map
## Name of the field to map. Globs accepted.
field = "status"
## Name of the tag to map
## Name of the tag to map. Globs accepted.
# tag = "status"
## Destination tag or field to be used for the mapped value. By default the
Expand All @@ -34,6 +35,9 @@ var sampleConfig = `

type EnumMapper struct {
Mappings []Mapping `toml:"mapping"`

FieldFilters map[string]filter.Filter
TagFilters map[string]filter.Filter
}

type Mapping struct {
Expand All @@ -44,6 +48,29 @@ type Mapping struct {
ValueMappings map[string]interface{}
}

func (mapper *EnumMapper) Init() error {
mapper.FieldFilters = make(map[string]filter.Filter)
mapper.TagFilters = make(map[string]filter.Filter)
for _, mapping := range mapper.Mappings {
if mapping.Field != "" {
fieldFilter, err := filter.NewIncludeExcludeFilter([]string{mapping.Field}, nil)
if err != nil {
return fmt.Errorf("Failed to create new field filter: %w", err)
}
mapper.FieldFilters[mapping.Field] = fieldFilter
}
if mapping.Tag != "" {
tagFilter, err := filter.NewIncludeExcludeFilter([]string{mapping.Tag}, nil)
if err != nil {
return fmt.Errorf("Failed to create new tag filter: %s", err)
}
mapper.TagFilters[mapping.Tag] = tagFilter
}
}

return nil
}

func (mapper *EnumMapper) SampleConfig() string {
return sampleConfig
}
Expand All @@ -60,30 +87,56 @@ func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric {
}

func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
newFields := make(map[string]interface{})
newTags := make(map[string]string)

for _, mapping := range mapper.Mappings {
if mapping.Field != "" {
if originalValue, isPresent := metric.GetField(mapping.Field); isPresent {
if adjustedValue, isString := adjustValue(originalValue).(string); isString {
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent {
writeField(metric, mapping.getDestination(), mappedValue)
}
mapper.fieldMapping(metric, mapping, newFields)
}
if mapping.Tag != "" {
mapper.tagMapping(metric, mapping, newTags)
}
}

for k, v := range newFields {
writeField(metric, k, v)
}

for k, v := range newTags {
writeTag(metric, k, v)
}

return metric
}

func (mapper *EnumMapper) fieldMapping(metric telegraf.Metric, mapping Mapping, newFields map[string]interface{}) {
fields := metric.FieldList()
for _, f := range fields {
if mapper.FieldFilters[mapping.Field].Match(f.Key) {
if adjustedValue, isString := adjustValue(f.Value).(string); isString {
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent {
newFields[mapping.getDestination(f.Key)] = mappedValue
}
}
}
if mapping.Tag != "" {
if originalValue, isPresent := metric.GetTag(mapping.Tag); isPresent {
if mappedValue, isMappedValuePresent := mapping.mapValue(originalValue); isMappedValuePresent {
switch val := mappedValue.(type) {
case string:
writeTag(metric, mapping.getDestinationTag(), val)
default:
writeTag(metric, mapping.getDestinationTag(), fmt.Sprintf("%v", val))
}
}
}

func (mapper *EnumMapper) tagMapping(metric telegraf.Metric, mapping Mapping, newTags map[string]string) {
tags := metric.TagList()
for _, t := range tags {
if mapper.TagFilters[mapping.Tag].Match(t.Key) {
if mappedValue, isMappedValuePresent := mapping.mapValue(t.Value); isMappedValuePresent {
switch val := mappedValue.(type) {
case string:
newTags[mapping.getDestination(t.Key)] = val
default:
newTags[mapping.getDestination(t.Key)] = fmt.Sprintf("%v", val)
}
}
}
}
return metric
}

func adjustValue(in interface{}) interface{} {
Expand All @@ -109,18 +162,11 @@ func (mapping *Mapping) mapValue(original string) (interface{}, bool) {
return original, false
}

func (mapping *Mapping) getDestination() string {
if mapping.Dest != "" {
return mapping.Dest
}
return mapping.Field
}

func (mapping *Mapping) getDestinationTag() string {
func (mapping *Mapping) getDestination(defaultDest string) string {
if mapping.Dest != "" {
return mapping.Dest
}
return mapping.Tag
return defaultDest
}

func writeField(metric telegraf.Metric, name string, value interface{}) {
Expand Down
61 changes: 48 additions & 13 deletions plugins/processors/enum/enum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func createTestMetric() telegraf.Metric {
metric, _ := metric.New("m1",
map[string]string{"tag": "tag_value"},
map[string]string{
"tag": "tag_value",
"duplicate_tag": "tag_value",
},
map[string]interface{}{
"string_value": "test",
"int_value": int(200),
"uint_value": uint(500),
"float_value": float64(3.14),
"true_value": true,
"string_value": "test",
"duplicate_string_value": "test",
"int_value": int(200),
"uint_value": uint(500),
"float_value": float64(3.14),
"true_value": true,
},
time.Now(),
)
Expand Down Expand Up @@ -48,6 +53,8 @@ func assertTagValue(t *testing.T, expected interface{}, tag string, tags map[str

func TestRetainsMetric(t *testing.T) {
mapper := EnumMapper{}
err := mapper.Init()
require.Nil(t, err)
source := createTestMetric()

target := mapper.Apply(source)[0]
Expand All @@ -64,15 +71,17 @@ func TestRetainsMetric(t *testing.T) {

func TestMapsSingleStringValueTag(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Tag: "tag", ValueMappings: map[string]interface{}{"tag_value": "valuable"}}}}

err := mapper.Init()
require.Nil(t, err)
tags := calculateProcessedTags(mapper, createTestMetric())

assertTagValue(t, "valuable", "tag", tags)
}

func TestNoFailureOnMappingsOnNonSupportedValuedFields(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "float_value", ValueMappings: map[string]interface{}{"3.14": "pi"}}}}

err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())

assertFieldValue(t, float64(3.14), "float_value", fields)
Expand Down Expand Up @@ -110,6 +119,8 @@ func TestMappings(t *testing.T) {
field_name := mapping["field_name"][0].(string)
for index := range mapping["target_value"] {
mapper := EnumMapper{Mappings: []Mapping{{Field: field_name, ValueMappings: map[string]interface{}{mapping["target_value"][index].(string): mapping["mapped_value"][index]}}}}
err := mapper.Init()
assert.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, mapping["expected_value"][index], field_name, fields)
}
Expand All @@ -118,31 +129,35 @@ func TestMappings(t *testing.T) {

func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}

err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())

assertFieldValue(t, 42, "string_value", fields)
}

func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}

err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())

assertFieldValue(t, 1, "string_value", fields)
}

func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}

err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())

assertFieldValue(t, "test", "string_value", fields)
}

func TestWritesToDestination(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}

err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())

assertFieldValue(t, "test", "string_value", fields)
Expand All @@ -152,10 +167,30 @@ func TestWritesToDestination(t *testing.T) {
func TestDoNotWriteToDestinationWithoutDefaultOrDefinedMapping(t *testing.T) {
field := "string_code"
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: field, ValueMappings: map[string]interface{}{"other": int64(1)}}}}

err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())

assertFieldValue(t, "test", "string_value", fields)
_, present := fields[field]
assert.False(t, present, "value of field '"+field+"' was present")
}

func TestFieldGlobMatching(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "*", ValueMappings: map[string]interface{}{"test": "glob"}}}}
err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())

assertFieldValue(t, "glob", "string_value", fields)
assertFieldValue(t, "glob", "duplicate_string_value", fields)
}

func TestTagGlobMatching(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Tag: "*", ValueMappings: map[string]interface{}{"tag_value": "glob"}}}}
err := mapper.Init()
require.Nil(t, err)
tags := calculateProcessedTags(mapper, createTestMetric())

assertTagValue(t, "glob", "tag", tags)
}

0 comments on commit 920a51a

Please sign in to comment.