Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logfmt parser #4539

Merged
merged 22 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Collectd](#collectd)
1. [Dropwizard](#dropwizard)
1. [Grok](#grok)
1. [Logfmt](#logfmt)
1. [Wavefront](#wavefront)

Telegraf metrics, like InfluxDB
Expand Down Expand Up @@ -882,6 +883,22 @@ the file output will only print once per `flush_interval`.
- If successful, add the next token, update the pattern and retest.
- Continue one token at a time until the entire line is successfully parsed.

# Logfmt
This parser implements the logfmt format by extracting and converting key-value pairs from log text in the form `<key>=<value>`.
At the moment, the plugin will produce one metric per line and all keys
are added as fields.
A typical log
```
method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z
connect=4ms service=8ms status=200 bytes=1653
```
will be converted into
```
logfmt method="GET",host="influxdata.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i

```
Additional information about the logfmt format can be found [here](https://brandur.org/logfmt).

# Wavefront:

Wavefront Data Format is metrics are parsed directly into Telegraf metrics.
Expand Down
111 changes: 111 additions & 0 deletions plugins/parsers/logfmt/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package logfmt

import (
"bytes"
"fmt"
"strconv"
"time"

"github.com/go-logfmt/logfmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

var (
ErrNoMetric = fmt.Errorf("no metric in line")
)

// Parser decodes logfmt formatted messages into metrics.
type Parser struct {
MetricName string
DefaultTags map[string]string
Now func() time.Time
}

// NewParser creates a parser.
func NewParser(metricName string, defaultTags map[string]string) *Parser {
return &Parser{
MetricName: metricName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is not set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, doesn't telegraf use the plugin name of the input/processor that uses it?

DefaultTags: defaultTags,
Now: time.Now,
}
}

// Parse converts a slice of bytes in logfmt format to metrics.
func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
reader := bytes.NewReader(b)
decoder := logfmt.NewDecoder(reader)
metrics := make([]telegraf.Metric, 0)
for {
ok := decoder.ScanRecord()
if !ok {
err := decoder.Err()
if err != nil {
return nil, err
}
break
}
fields := make(map[string]interface{})
for decoder.ScanKeyval() {
if string(decoder.Value()) == "" {
continue
}

//type conversions
value := string(decoder.Value())
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
fields[string(decoder.Key())] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
fields[string(decoder.Key())] = fValue
} else if bValue, err := strconv.ParseBool(value); err == nil {
fields[string(decoder.Key())] = bValue
} else {
fields[string(decoder.Key())] = value
}
}
if len(fields) == 0 {
continue
}

m, err := metric.New(p.MetricName, map[string]string{}, fields, p.Now())
if err != nil {
return nil, err
}

metrics = append(metrics, m)
}
p.applyDefaultTags(metrics)
return metrics, nil
}

// ParseLine converts a single line of text in logfmt format to metrics.
func (p *Parser) ParseLine(s string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(s))
if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, ErrNoMetric
}
return metrics[0], nil
}

// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine.
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
if len(p.DefaultTags) == 0 {
return
}

for _, m := range metrics {
for k, v := range p.DefaultTags {
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
}
231 changes: 231 additions & 0 deletions plugins/parsers/logfmt/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package logfmt

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric {
t.Helper()
v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time)
if err != nil {
t.Fatal(err)
}
return v
}

func TestParse(t *testing.T) {
tests := []struct {
name string
measurement string
now func() time.Time
bytes []byte
want []testutil.Metric
wantErr bool
}{
{
name: "no bytes returns no metrics",
now: func() time.Time { return time.Unix(0, 0) },
want: []testutil.Metric{},
},
{
name: "test without trailing end",
bytes: []byte("foo=\"bar\""),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"foo": "bar",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "test with trailing end",
bytes: []byte("foo=\"bar\"\n"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"foo": "bar",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "logfmt parser returns all the fields",
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "logfmt parser parses every line",
bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"parent_id": "088876RL000",
"duration": 7.45,
"log_id": "09R4e4Rl000",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "keys without = or values are ignored",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`i am no data.`),
want: []testutil.Metric{},
wantErr: false,
},
{
name: "keys without values are ignored",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`foo="" bar=`),
want: []testutil.Metric{},
wantErr: false,
},
{
name: "unterminated quote produces error",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
bytes: []byte(`bar=baz foo="bar`),
want: []testutil.Metric{},
wantErr: true,
},
{
name: "malformed key",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
bytes: []byte(`"foo=" bar=baz`),
want: []testutil.Metric{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.Parse(tt.bytes)
if (err != nil) != tt.wantErr {
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
return
}
require.Equal(t, len(tt.want), len(got))
for i, m := range got {
testutil.MustEqual(t, m, tt.want[i])
}
})
}
}

func TestParseLine(t *testing.T) {
tests := []struct {
name string
s string
measurement string
now func() time.Time
want testutil.Metric
wantErr bool
}{
{
name: "No Metric In line",
now: func() time.Time { return time.Unix(0, 0) },
want: testutil.Metric{},
wantErr: true,
},
{
name: "Log parser fmt returns all fields",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`,
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
{
name: "ParseLine only returns metrics from first string",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000",
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.ParseLine(tt.s)
if (err != nil) != tt.wantErr {
t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
}
if got != nil {
testutil.MustEqual(t, got, tt.want)
}
})
}
}
Loading