Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logfmt parser #4539

Merged
merged 22 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Collectd](#collectd)
1. [Dropwizard](#dropwizard)
1. [Grok](#grok)
1. [Logfmt](#logfmt)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
Expand Down Expand Up @@ -886,4 +887,23 @@ the file output will only print once per `flush_interval`.
- If successful, add the next token, update the pattern and retest.
- Continue one token at a time until the entire line is successfully parsed.


# Logfmt
This parser implements the logfmt format by extracting key-value pairs from log text in the form `<key>=<value>`.
At the moment, the plugin will produce one metric per line and all keys
are added as fields. Values are left as strings (for now).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Values are left as strings

The code below doesn't do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Values are left as strings (for now).

This isn't true, we are doing auto conversion to other types.

A typical log
```
method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z
connect=4ms service=8ms status=200 bytes=1653
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example should be a single line

```
will be converted into
```
"method": "GET"
"host": "influxdata.org",
"ts": "2018-07-24T19:43:40.275Z",
"connect": "4ms",
"service": "8ms",
"status": 200,
"bytes": 1653,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Show the converted version as line protocol.

```
Additional information about the logfmt format can be found [here](https://brandur.org/logfmt).
115 changes: 115 additions & 0 deletions plugins/parsers/logfmt/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Package logfmt converts logfmt data into metrics. New comment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a pass through the file and make sure comments are all useful and well written.

package logfmt

import (
"bytes"
"fmt"
"log"
"strconv"
"time"

glogfmt "github.com/go-logfmt/logfmt"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to alias the import path since there is no conflict.

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

var (
ErrNoMetric = fmt.Errorf("no metric in line")
)

// Parser decodes logfmt formatted messages into metrics.
type Parser struct {
MetricName string
DefaultTags map[string]string
Now func() time.Time
}

// NewParser creates a parser.
func NewParser(metricName string, defaultTags map[string]string) *Parser {
return &Parser{
MetricName: metricName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is not set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, doesn't telegraf use the plugin name of the input/processor that uses it?

DefaultTags: defaultTags,
Now: time.Now,
}
}

// Parse converts a slice of bytes in logfmt format to metrics.
func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
reader := bytes.NewReader(b)
decoder := glogfmt.NewDecoder(reader)
metrics := make([]telegraf.Metric, 0)
for decoder.ScanRecord() {
tags := make(map[string]string)
fields := make(map[string]interface{})
for decoder.ScanKeyval() {
if string(decoder.Value()) == "" {
return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key()))
}

//attempt type conversions
value := string(decoder.Value())
if iValue, err := strconv.Atoi(value); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use strconv.ParseInt here because it handles 64-bit ints correctly.

//log.Printf("Print Atoi Value Here:", iValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up (delete) log messages.

//log.Printf("DECODER =", decoder.Key())
fields[string(decoder.Key())] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
log.Printf("key:%s, value:%s", decoder.Key(), value)
//log.Printf("Print ParseFloat Value Here:", fValue)
fields[string(decoder.Key())] = fValue
} else if bValue, err := strconv.ParseBool(value); err == nil {
//log.Printf("Print ParseBool Value Here:", bValue)
fields[string(decoder.Key())] = bValue
} else {
log.Printf("key:%s, value:%s", decoder.Key(), value)
// log.Printf("Print Value Here:", value)
fields[string(decoder.Key())] = value
//log.Printf("DECODER =", decoder.Key())
}
}
log.Printf("All fields: %s", fields)
m, err := metric.New(p.MetricName, tags, fields, p.Now())
//log.Printf("Return all the info in metric", p.MetricName, tags, fields)
if err != nil {
log.Println("Error occurred")
return nil, err
}

//add default tags
metrics = append(metrics, m)
p.applyDefaultTags(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call only once at end of function, since this applies the tags to all metrics.

}
return metrics, nil
}

// ParseLine converts a single line of text in logfmt to metrics.
func (p *Parser) ParseLine(s string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(s))
if err != nil {
return nil, err
}

if len(metrics) < 1 {
//if metrics[1] == nil {
return nil, ErrNoMetric
}
return metrics[0], nil
}

// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine.
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
if len(p.DefaultTags) == 0 {
return
}

for _, m := range metrics {
for k, v := range p.DefaultTags {
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
}
210 changes: 210 additions & 0 deletions plugins/parsers/logfmt/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package logfmt

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric {
t.Helper()
v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time)
if err != nil {
t.Fatal(err)
}
return v
}

func TestParse(t *testing.T) {
tests := []struct {
name string
measurement string
now func() time.Time
bytes []byte
want []testutil.Metric
wantErr bool
}{
{
name: "no bytes returns no metrics",
now: func() time.Time { return time.Unix(0, 0) },
want: []testutil.Metric{},
},
{
name: "test without trailing end",
bytes: []byte("foo=\"bar\""),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"foo": "bar",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "test with trailing end",
bytes: []byte("foo=\"bar\"\n"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"foo": "bar",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "logfmt parser returns all the fields",
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "logfmt parses every line",
bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"parent_id": "088876RL000",
"duration": 7.45,
"log_id": "09R4e4Rl000",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "poorly formatted logfmt returns error",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`i am garbage data.`),
want: []testutil.Metric{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.Parse(tt.bytes)
if (err != nil) != tt.wantErr {
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
return
}
require.Equal(t, len(tt.want), len(got))
for i, m := range got {
testutil.MustEqual(t, m, tt.want[i])
//log.Printf("Are they equal", t, tt.want[i], m)
}
})
}
}

func TestParseLine(t *testing.T) {
tests := []struct {
name string
s string
measurement string
now func() time.Time
want testutil.Metric
wantErr bool
}{
{
name: "No Metric In line",
now: func() time.Time { return time.Unix(0, 0) },
want: testutil.Metric{},
wantErr: true,
},
{
name: "Log parser fmt returns all fields",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`,
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
{
name: "Log parser only returns metrics from first string",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000 "/n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the newline test here \n, like up on line 89

method=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000`,
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.ParseLine(tt.s)
if (err != nil) != tt.wantErr {
t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
}
if got != nil {
testutil.MustEqual(t, got, tt.want)
}
})
}
}
8 changes: 8 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/value"
)
Expand Down Expand Up @@ -139,6 +140,8 @@ func NewParser(config *Config) (Parser, error) {
config.GrokCustomPatterns,
config.GrokCustomPatternFiles,
config.GrokTimeZone)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand Down Expand Up @@ -238,3 +241,8 @@ func NewDropwizardParser(
}
return parser, err
}

// NewLogFmtParser returns a logfmt parser with the default options.
func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) {
return logfmt.NewParser(metricName, defaultTags), nil
}