-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
430d710
commit e893dc3
Showing
4 changed files
with
367 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package logfmt | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/go-logfmt/logfmt" | ||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/metric" | ||
) | ||
|
||
var ( | ||
ErrNoMetric = fmt.Errorf("no metric in line") | ||
) | ||
|
||
// Parser decodes logfmt formatted messages into metrics. | ||
type Parser struct { | ||
MetricName string | ||
DefaultTags map[string]string | ||
Now func() time.Time | ||
} | ||
|
||
// NewParser creates a parser. | ||
func NewParser(metricName string, defaultTags map[string]string) *Parser { | ||
return &Parser{ | ||
MetricName: metricName, | ||
DefaultTags: defaultTags, | ||
Now: time.Now, | ||
} | ||
} | ||
|
||
// Parse converts a slice of bytes in logfmt format to metrics. | ||
func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { | ||
reader := bytes.NewReader(b) | ||
decoder := logfmt.NewDecoder(reader) | ||
metrics := make([]telegraf.Metric, 0) | ||
for { | ||
ok := decoder.ScanRecord() | ||
if !ok { | ||
err := decoder.Err() | ||
if err != nil { | ||
return nil, err | ||
} | ||
break | ||
} | ||
fields := make(map[string]interface{}) | ||
for decoder.ScanKeyval() { | ||
if string(decoder.Value()) == "" { | ||
continue | ||
} | ||
|
||
//type conversions | ||
value := string(decoder.Value()) | ||
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { | ||
fields[string(decoder.Key())] = iValue | ||
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil { | ||
fields[string(decoder.Key())] = fValue | ||
} else if bValue, err := strconv.ParseBool(value); err == nil { | ||
fields[string(decoder.Key())] = bValue | ||
} else { | ||
fields[string(decoder.Key())] = value | ||
} | ||
} | ||
if len(fields) == 0 { | ||
continue | ||
} | ||
|
||
m, err := metric.New(p.MetricName, map[string]string{}, fields, p.Now()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
metrics = append(metrics, m) | ||
} | ||
p.applyDefaultTags(metrics) | ||
return metrics, nil | ||
} | ||
|
||
// ParseLine converts a single line of text in logfmt format to metrics. | ||
func (p *Parser) ParseLine(s string) (telegraf.Metric, error) { | ||
metrics, err := p.Parse([]byte(s)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(metrics) < 1 { | ||
return nil, ErrNoMetric | ||
} | ||
return metrics[0], nil | ||
} | ||
|
||
// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine. | ||
func (p *Parser) SetDefaultTags(tags map[string]string) { | ||
p.DefaultTags = tags | ||
} | ||
|
||
func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) { | ||
if len(p.DefaultTags) == 0 { | ||
return | ||
} | ||
|
||
for _, m := range metrics { | ||
for k, v := range p.DefaultTags { | ||
if !m.HasTag(k) { | ||
m.AddTag(k, v) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
package logfmt | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/metric" | ||
"github.com/influxdata/telegraf/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric { | ||
t.Helper() | ||
v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
return v | ||
} | ||
|
||
func TestParse(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
measurement string | ||
now func() time.Time | ||
bytes []byte | ||
want []testutil.Metric | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "no bytes returns no metrics", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
want: []testutil.Metric{}, | ||
}, | ||
{ | ||
name: "test without trailing end", | ||
bytes: []byte("foo=\"bar\""), | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
want: []testutil.Metric{ | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"foo": "bar", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "test with trailing end", | ||
bytes: []byte("foo=\"bar\"\n"), | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
want: []testutil.Metric{ | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"foo": "bar", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "logfmt parser returns all the fields", | ||
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
want: []testutil.Metric{ | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"lvl": "info", | ||
"msg": "http request", | ||
"method": "POST", | ||
"ts": "2018-07-24T19:43:40.275Z", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "logfmt parser parses every line", | ||
bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
want: []testutil.Metric{ | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"lvl": "info", | ||
"msg": "http request", | ||
"method": "POST", | ||
"ts": "2018-07-24T19:43:40.275Z", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"parent_id": "088876RL000", | ||
"duration": 7.45, | ||
"log_id": "09R4e4Rl000", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "keys without = or values are ignored", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
bytes: []byte(`i am no data.`), | ||
want: []testutil.Metric{}, | ||
wantErr: false, | ||
}, | ||
{ | ||
name: "keys without values are ignored", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
bytes: []byte(`foo="" bar=`), | ||
want: []testutil.Metric{}, | ||
wantErr: false, | ||
}, | ||
{ | ||
name: "unterminated quote produces error", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
bytes: []byte(`bar=baz foo="bar`), | ||
want: []testutil.Metric{}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "malformed key", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
bytes: []byte(`"foo=" bar=baz`), | ||
want: []testutil.Metric{}, | ||
wantErr: true, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
l := Parser{ | ||
MetricName: tt.measurement, | ||
Now: tt.now, | ||
} | ||
got, err := l.Parse(tt.bytes) | ||
if (err != nil) != tt.wantErr { | ||
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) | ||
return | ||
} | ||
require.Equal(t, len(tt.want), len(got)) | ||
for i, m := range got { | ||
testutil.MustEqual(t, m, tt.want[i]) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestParseLine(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
s string | ||
measurement string | ||
now func() time.Time | ||
want testutil.Metric | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "No Metric In line", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
want: testutil.Metric{}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "Log parser fmt returns all fields", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, | ||
want: testutil.Metric{ | ||
Measurement: "testlog", | ||
Fields: map[string]interface{}{ | ||
"ts": "2018-07-24T19:43:35.207268Z", | ||
"lvl": int64(5), | ||
"msg": "Write failed", | ||
"log_id": "09R4e4Rl000", | ||
}, | ||
Tags: map[string]string{}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
{ | ||
name: "ParseLine only returns metrics from first string", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000", | ||
want: testutil.Metric{ | ||
Measurement: "testlog", | ||
Fields: map[string]interface{}{ | ||
"ts": "2018-07-24T19:43:35.207268Z", | ||
"lvl": int64(5), | ||
"msg": "Write failed", | ||
"log_id": "09R4e4Rl000", | ||
}, | ||
Tags: map[string]string{}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
l := Parser{ | ||
MetricName: tt.measurement, | ||
Now: tt.now, | ||
} | ||
got, err := l.ParseLine(tt.s) | ||
if (err != nil) != tt.wantErr { | ||
t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) | ||
} | ||
if got != nil { | ||
testutil.MustEqual(t, got, tt.want) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.