Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for converting tag or field to measurement in converter processor #7049

Merged
merged 4 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions plugins/processors/converter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Values that cannot be converted are dropped.
uniquely identifiable. Fields with the same series key (measurement + tags)
will overwrite one another.

### Configuration:
### Configuration
```toml
# Convert values to another metric value type
[[processors.converter]]
Expand All @@ -19,6 +19,7 @@ will overwrite one another.
## select the keys to convert. The array may contain globs.
## <target-type> = [<tag-key>...]
[processors.converter.tags]
measurement = []
string = []
integer = []
unsigned = []
Expand All @@ -31,6 +32,7 @@ will overwrite one another.
## select the keys to convert. The array may contain globs.
## <target-type> = [<field-key>...]
[processors.converter.fields]
measurement = []
tag = []
string = []
integer = []
Expand All @@ -39,19 +41,40 @@ will overwrite one another.
float = []
```

### Examples:
### Example

Convert `port` tag to a string field:
```toml
[[processors.converter]]
[processors.converter.tags]
string = ["port"]
```

```diff
- apache,port=80,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0
+ apache,server=debian-stretch-apache port="80",BusyWorkers=1,BytesPerReq=0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these be in the same order to be more clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll update this, it should read better if I move the port to the end of the fields:

+ apache,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0,port="80"

```

Convert all `scboard_*` fields to an integer:
```toml
[[processors.converter]]
[processors.converter.fields]
integer = ["scboard_*"]
tag = ["ParentServerConfigGeneration"]
```

```diff
- apache,port=80,server=debian-stretch-apache BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerConfigGeneration=3,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0,scboard_dnslookup=0,scboard_finishing=0,scboard_idle_cleanup=0,scboard_keepalive=0,scboard_logging=0,scboard_open=100,scboard_reading=0,scboard_sending=1,scboard_starting=0,scboard_waiting=49 1502489900000000000
+ apache,server=debian-stretch-apache,ParentServerConfigGeneration=3 port="80",BusyWorkers=1,BytesPerReq=0,BytesPerSec=0,CPUChildrenSystem=0,CPUChildrenUser=0,CPULoad=0.00995025,CPUSystem=0.01,CPUUser=0.01,ConnsAsyncClosing=0,ConnsAsyncKeepAlive=0,ConnsAsyncWriting=0,ConnsTotal=0,IdleWorkers=49,Load1=0.01,Load15=0,Load5=0,ParentServerMPMGeneration=2,ReqPerSec=0.00497512,ServerUptimeSeconds=201,TotalAccesses=1,TotalkBytes=0,Uptime=201,scboard_closing=0i,scboard_dnslookup=0i,scboard_finishing=0i,scboard_idle_cleanup=0i,scboard_keepalive=0i,scboard_logging=0i,scboard_open=100i,scboard_reading=0i,scboard_sending=1i,scboard_starting=0i,scboard_waiting=49i 1502489900000000000
- apache scboard_closing=0,scboard_dnslookup=0,scboard_finishing=0,scboard_idle_cleanup=0,scboard_keepalive=0,scboard_logging=0,scboard_open=100,scboard_reading=0,scboard_sending=1,scboard_starting=0,scboard_waiting=49
+ apache scboard_closing=0i,scboard_dnslookup=0i,scboard_finishing=0i,scboard_idle_cleanup=0i,scboard_keepalive=0i,scboard_logging=0i,scboard_open=100i,scboard_reading=0i,scboard_sending=1i,scboard_starting=0i,scboard_waiting=49i
```

Rename the measurement from a tag value:
```toml
[[processors.converter]]
[processors.converter.tags]
measurement = ["topic"]
```

```diff
- mqtt_consumer,topic=sensor temp=42
+ sensor temp=42
```
114 changes: 61 additions & 53 deletions plugins/processors/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package converter

import (
"fmt"
"log"
"math"
"strconv"

Expand All @@ -18,6 +17,7 @@ var sampleConfig = `
## select the keys to convert. The array may contain globs.
## <target-type> = [<tag-key>...]
[processors.converter.tags]
measurement = []
string = []
integer = []
unsigned = []
Expand All @@ -30,6 +30,7 @@ var sampleConfig = `
## select the keys to convert. The array may contain globs.
## <target-type> = [<field-key>...]
[processors.converter.fields]
measurement = []
tag = []
string = []
integer = []
Expand All @@ -39,30 +40,32 @@ var sampleConfig = `
`

type Conversion struct {
Tag []string `toml:"tag"`
String []string `toml:"string"`
Integer []string `toml:"integer"`
Unsigned []string `toml:"unsigned"`
Boolean []string `toml:"boolean"`
Float []string `toml:"float"`
Measurement []string `toml:"measurement"`
Tag []string `toml:"tag"`
String []string `toml:"string"`
Integer []string `toml:"integer"`
Unsigned []string `toml:"unsigned"`
Boolean []string `toml:"boolean"`
Float []string `toml:"float"`
}

type Converter struct {
Tags *Conversion `toml:"tags"`
Fields *Conversion `toml:"fields"`
Tags *Conversion `toml:"tags"`
Fields *Conversion `toml:"fields"`
Log telegraf.Logger `toml:"-"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should log be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does needs to be exported, which is why we mark it unsettable from the toml. This is due to how we inject the Logger from outside the package (which we have never been super happy with). When adding this we also toyed around with injecting the logger on init: Init(Logger) error but ultimately decided not to inject it there.


initialized bool
tagConversions *ConversionFilter
fieldConversions *ConversionFilter
}

type ConversionFilter struct {
Tag filter.Filter
String filter.Filter
Integer filter.Filter
Unsigned filter.Filter
Boolean filter.Filter
Float filter.Filter
Measurement filter.Filter
Tag filter.Filter
String filter.Filter
Integer filter.Filter
Unsigned filter.Filter
Boolean filter.Filter
Float filter.Filter
}

func (p *Converter) SampleConfig() string {
Expand All @@ -73,15 +76,11 @@ func (p *Converter) Description() string {
return "Convert values to another metric value type"
}

func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if !p.initialized {
err := p.compile()
if err != nil {
logPrintf("initialization error: %v\n", err)
return metrics
}
}
func (p *Converter) Init() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a contract change that we need to worry about? previously Apply would init for you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Init() function is optional on any plugin type, but will be called if defined. It makes it a little bit easier to initialize a plugin without needing the manage it and improves the plugins ability to report errors with the configuration was setup improperly.

return p.compile()
}

func (p *Converter) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
for _, metric := range metrics {
p.convertTags(metric)
p.convertFields(metric)
Expand All @@ -106,7 +105,6 @@ func (p *Converter) compile() error {

p.tagConversions = tf
p.fieldConversions = ff
p.initialized = true
return nil
}

Expand All @@ -117,6 +115,11 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) {

var err error
cf := &ConversionFilter{}
cf.Measurement, err = filter.Compile(conv.Measurement)
if err != nil {
return nil, err
}

cf.Tag, err = filter.Compile(conv.Tag)
if err != nil {
return nil, err
Expand Down Expand Up @@ -150,13 +153,19 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) {
return cf, nil
}

// convertTags converts tags into fields
// convertTags converts tags into measurements or fields.
func (p *Converter) convertTags(metric telegraf.Metric) {
if p.tagConversions == nil {
return
}

for key, value := range metric.Tags() {
if p.tagConversions.Measurement != nil && p.tagConversions.Measurement.Match(key) {
metric.RemoveTag(key)
metric.SetName(value)
continue
}

if p.tagConversions.String != nil && p.tagConversions.String.Match(key) {
metric.RemoveTag(key)
metric.AddField(key, value)
Expand All @@ -167,7 +176,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
v, ok := toInteger(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to integer [%T]: %v\n", value, value)
p.Log.Errorf("error converting to integer [%T]: %v", value, value)
continue
}

Expand All @@ -179,7 +188,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
v, ok := toUnsigned(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to unsigned [%T]: %v\n", value, value)
p.Log.Errorf("error converting to unsigned [%T]: %v", value, value)
continue
}

Expand All @@ -192,7 +201,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
v, ok := toBool(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to boolean [%T]: %v\n", value, value)
p.Log.Errorf("error converting to boolean [%T]: %v", value, value)
continue
}

Expand All @@ -205,7 +214,7 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
v, ok := toFloat(value)
if !ok {
metric.RemoveTag(key)
logPrintf("error converting to float [%T]: %v\n", value, value)
p.Log.Errorf("error converting to float [%T]: %v", value, value)
continue
}

Expand All @@ -216,18 +225,31 @@ func (p *Converter) convertTags(metric telegraf.Metric) {
}
}

// convertFields converts fields into tags or other field types
// convertFields converts fields into measurements, tags, or other field types.
func (p *Converter) convertFields(metric telegraf.Metric) {
if p.fieldConversions == nil {
return
}

for key, value := range metric.Fields() {
if p.fieldConversions.Measurement != nil && p.fieldConversions.Measurement.Match(key) {
v, ok := toString(value)
if !ok {
metric.RemoveField(key)
p.Log.Errorf("error converting to measurement [%T]: %v", value, value)
continue
}

metric.RemoveField(key)
metric.SetName(v)
continue
}

if p.fieldConversions.Tag != nil && p.fieldConversions.Tag.Match(key) {
v, ok := toString(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to tag [%T]: %v\n", value, value)
p.Log.Errorf("error converting to tag [%T]: %v", value, value)
continue
}

Expand All @@ -240,7 +262,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toFloat(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to float [%T]: %v\n", value, value)
p.Log.Errorf("error converting to float [%T]: %v", value, value)
continue
}

Expand All @@ -253,7 +275,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toInteger(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to integer [%T]: %v\n", value, value)
p.Log.Errorf("error converting to integer [%T]: %v", value, value)
continue
}

Expand All @@ -266,7 +288,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toUnsigned(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to unsigned [%T]: %v\n", value, value)
p.Log.Errorf("error converting to unsigned [%T]: %v", value, value)
continue
}

Expand All @@ -279,7 +301,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toBool(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to bool [%T]: %v\n", value, value)
p.Log.Errorf("error converting to bool [%T]: %v", value, value)
continue
}

Expand All @@ -292,7 +314,7 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
v, ok := toString(value)
if !ok {
metric.RemoveField(key)
logPrintf("error converting to string [%T]: %v\n", value, value)
p.Log.Errorf("Error converting to string [%T]: %v", value, value)
continue
}

Expand Down Expand Up @@ -336,7 +358,7 @@ func toInteger(v interface{}) (int64, bool) {
} else if value > float64(math.MaxInt64) {
return math.MaxInt64, true
} else {
return int64(Round(value)), true
return int64(math.Round(value)), true
}
case bool:
if value {
Expand Down Expand Up @@ -375,7 +397,7 @@ func toUnsigned(v interface{}) (uint64, bool) {
} else if value > float64(math.MaxUint64) {
return math.MaxUint64, true
} else {
return uint64(Round(value)), true
return uint64(math.Round(value)), true
}
case bool:
if value {
Expand Down Expand Up @@ -435,20 +457,6 @@ func toString(v interface{}) (string, bool) {
return "", false
}

// math.Round was not added until Go 1.10, can be removed when support for Go
// 1.9 is dropped.
func Round(x float64) float64 {
t := math.Trunc(x)
if math.Abs(x-t) >= 0.5 {
return t + math.Copysign(1, x)
}
return t
}

func logPrintf(format string, v ...interface{}) {
log.Printf("D! [processors.converter] "+format, v...)
}

func init() {
processors.Add("converter", func() telegraf.Processor {
return &Converter{}
Expand Down
Loading