Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parser processor #4551

Merged
merged 24 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/processors/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/enum"
_ "github.com/influxdata/telegraf/plugins/processors/override"
_ "github.com/influxdata/telegraf/plugins/processors/printer"
_ "github.com/influxdata/telegraf/plugins/processors/parser"
_ "github.com/influxdata/telegraf/plugins/processors/regex"
_ "github.com/influxdata/telegraf/plugins/processors/topk"
)
57 changes: 57 additions & 0 deletions plugins/processors/parser/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Parser Processor Plugin
This plugin parses defined fields containing the specified data format.

## Configuration
```toml
[[processors.parser]]
## specify the name of the field[s] whose value will be parsed
parse_fields = ["message"]

## specify what to do with the original message. [merge|replace|keep] default=keep
original = "merge"


#fielddrop = "message"

[processors.parser.config]
data_format = "json"
## additional configurations for parser go here
tag_keys = ["verb", "request"]
```

### Tags:

User specified tags may be added by this processor.

### Example Config:
```toml
[[inputs.exec]]
data_format = "influx"
commands = [
"echo -en 'thing,host=\"mcfly\" message=\"{\\\"verb\\\":\\\"GET\\\",\\\"request\\\":\\\"/time/to/awesome\\\"}\" 1519652321000000000'"
]

[[processors.parser]]
## specify the name of the field[s] whose value will be parsed
parse_fields = ["message"]

## specify what to do with the original message. [merge|replace|keep] default=keep
original = "merge"

[processors.parser.config]
data_format = "json"
## additional configurations for parser go here
tag_keys = ["verb", "request"]

[[outputs.file]]
files = ["stdout"]
```

### Example Output [original=merge]:
```
# input = nginx_requests,host="mcfly" message="{\"verb\":\"GET\",\"request\":\"/time/to/awesome\"}" 1519652321000000000
nginx_requests,host="mcfly",verb="GET",request="/time/to/awesome" message="{\"verb\":\"GET\",\"request\":\"/time/to/awesome\"}" 1519652321000000000
```

### Caveats
While this plugin is functional, it may not work in *every* scenario. For the above example, "keep" and "replace" fail because the parsed field produces a metric with no fields. This leads to errors when serializing the output.
129 changes: 129 additions & 0 deletions plugins/processors/parser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package parser

import (
"fmt"
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
)

type Parser struct {
Config parsers.Config `toml:"config"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an embedded field for the parsers.Config:


type Parser struct {
	parsers.Config `toml:"config"`

This should make it so you can specify the options directly on the Parser object, similar to how it is set in input plugins:

[[processors.parser]]
  parse_fields = ["message"]
  original = "merge"
  data_format = "json"
  tag_keys = ["verb", "request"]

Original string `toml:"original"` // merge, replace, or keep (default)
ParseFields []string `toml:"parse_fields"`
Parser parsers.Parser `toml:"parser"`
}

// holds a default sample config
var SampleConfig = `
## specify the name of the field[s] whose value will be parsed
parse_fields = []

## specify what to do with the original message. [merge|replace|keep] default=keep
original = "keep"

[processors.parser.config]
# data_format = "logfmt"
## additional configurations for parser go here
`

// returns the default config
func (p *Parser) SampleConfig() string {
return SampleConfig
}

// returns a brief description of the processor
func (p *Parser) Description() string {
return "Parse a value in a specified field/tag(s) and add the result in a new metric"
}

func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if p.Parser == nil {
var err error
p.Parser, err = parsers.NewParser(&p.Config)
if err != nil {
log.Printf("E! [processors.parser] could not create parser: %v", err)
return metrics
}
}

rMetrics := []telegraf.Metric{}
if p.Original != "replace" {
rMetrics = metrics
}

name := ""

for _, metric := range metrics {
if n := metric.Name(); n != "" {
name = n
}
sMetrics := []telegraf.Metric{}
for _, key := range p.ParseFields {
if value, ok := metric.Fields()[key]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use .Fields() since it allocates a new map, (I really need to add a comment suggesting that it not be used). Instead use .FieldList(). You will probably want to reorder the loops since it think ParseFields should always be shorter than Fields.

strVal := fmt.Sprintf("%v", value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not a string then log a warning and skip.

nMetrics, err := p.parseField(strVal)
if err != nil {
log.Printf("E! [processors.parser] could not parse field %v: %v", key, err)
switch p.Original {
case "keep":
return metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right... don't we need to parse all fields before we can return?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, the tests missed this.

case "merge":
nMetrics = metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all confused by the variable names: rMetric, sMetric, nMetric. Can you try to improve these? The Metric part is less interesting than the r, s, n parts, since we can easily lookup the type.

}
}
sMetrics = append(sMetrics, nMetrics...)
} else {
fmt.Println("key not found", key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably ignore this situation to allow fields to be optional.

}
}
rMetrics = append(rMetrics, p.mergeTagsFields(sMetrics...)...)
}
if p.Original == "merge" {
rMetrics = p.mergeTagsFields(rMetrics...)
}

return p.setName(name, rMetrics...)

}

func (p Parser) setName(name string, metrics ...telegraf.Metric) []telegraf.Metric {
if len(metrics) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need this, if you remove it then it will just range over nothing and return metrics.

return nil
}

for i := range metrics {
metrics[i].SetName(name)
}

return metrics
}

func (p Parser) mergeTagsFields(metrics ...telegraf.Metric) []telegraf.Metric {
if len(metrics) == 0 {
return nil
}

rMetric := metrics[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a problem if more than one metric is passed in to Apply. Secretly this never happens but the interface doesn't promise this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than one metric does get passed into this function, it is intended for all the tags and fields to get merged down to the first metric

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than one metric does get passed into this function, it is intended for all the tags and fields to get merged down to the first metric

for _, metric := range metrics {
for key, field := range metric.Fields() {
rMetric.AddField(key, field)
}
for key, tag := range metric.Tags() {
rMetric.AddTag(key, tag)
}
}
return []telegraf.Metric{rMetric}
}

func (p *Parser) parseField(value string) ([]telegraf.Metric, error) {
return p.Parser.Parse([]byte(value))
}

func init() {
processors.Add("parser", func() telegraf.Processor {
return &Parser{Original: "keep"}
})
}
Loading