From 53dd0d905da62e551a6a98d928dd6dcd56c5c975 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 31 Jul 2018 11:35:23 -0700 Subject: [PATCH 01/21] Add logfmt parser --- docs/DATA_FORMATS_INPUT.md | 6 +- plugins/parsers/logfmt/parser.go | 110 ++++++++++++++++++++++ plugins/parsers/logfmt/parser_test.go | 130 ++++++++++++++++++++++++++ plugins/parsers/registry.go | 8 ++ 4 files changed, 253 insertions(+), 1 deletion(-) create mode 100644 plugins/parsers/logfmt/parser.go create mode 100644 plugins/parsers/logfmt/parser_test.go diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 5a63e9d83690c..066db9c1292b6 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Collectd](#collectd) 1. [Dropwizard](#dropwizard) 1. [Grok](#grok) +1. [Logfmt](#logfmt) Telegraf metrics, like InfluxDB [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), @@ -886,4 +887,7 @@ the file output will only print once per `flush_interval`. - If successful, add the next token, update the pattern and retest. - Continue one token at a time until the entire line is successfully parsed. - +# Logfmt +For extracting key-value pairs from log text in the form `=`. +At the moment, the plugin will produce one metric per line and all keys +are added as fields. Values are left as strings (for now). diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go new file mode 100644 index 0000000000000..5d19e4b3982d6 --- /dev/null +++ b/plugins/parsers/logfmt/parser.go @@ -0,0 +1,110 @@ +// Package logfmt converts logfmt data into metrics. +package logfmt + +import ( + "bytes" + "fmt" + "strconv" + "strings" + "time" + + glogfmt "github.com/go-logfmt/logfmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +// Parser decodes logfmt formatted messages into metrics. +type Parser struct { + MetricName string + DefaultTags map[string]string + Now func() time.Time +} + +// NewParser creates a parser. +func NewParser(metricName string, defaultTags map[string]string) *Parser { + return &Parser{ + MetricName: metricName, + DefaultTags: defaultTags, + Now: time.Now, + } +} + +// Parse converts a slice of bytes in logfmt format to metrics. +func (l *Parser) Parse(b []byte) ([]telegraf.Metric, error) { + reader := bytes.NewReader(b) + decoder := glogfmt.NewDecoder(reader) + metrics := make([]telegraf.Metric, 0) + for decoder.ScanRecord() { + tags := make(map[string]string) + fields := make(map[string]interface{}) + //add default tags + for k, v := range l.DefaultTags { + tags[k] = v + } + + for decoder.ScanKeyval() { + if string(decoder.Value()) == "" { + return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) + } + + //attempt type conversions + value := string(decoder.Value()) + if iValue, err := strconv.Atoi(value); err == nil { + fields[string(decoder.Key())] = iValue + } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + fields[string(decoder.Key())] = fValue + } else if bValue, err := strconv.ParseBool(value); err == nil { + fields[string(decoder.Key())] = bValue + } else { + fields[string(decoder.Key())] = value + } + } + m, err := metric.New(l.MetricName, tags, fields, l.Now()) + if err != nil { + return nil, err + } + metrics = append(metrics, m) + } + return metrics, nil +} + +// ParseLine converts a single line of text in logfmt to metrics. +func (l *Parser) ParseLine(s string) (telegraf.Metric, error) { + reader := strings.NewReader(s) + decoder := glogfmt.NewDecoder(reader) + + decoder.ScanRecord() + tags := make(map[string]string) + fields := make(map[string]interface{}) + //add default tags + for k, v := range l.DefaultTags { + tags[k] = v + } + + for decoder.ScanKeyval() { + if string(decoder.Value()) == "" { + return nil, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) + } + //attempt type conversions + value := string(decoder.Value()) + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + fields[string(decoder.Key())] = iValue + } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + fields[string(decoder.Key())] = fValue + } else if bValue, err := strconv.ParseBool(value); err == nil { + fields[string(decoder.Key())] = bValue + } else { + fields[string(decoder.Key())] = value + } + } + m, err := metric.New(l.MetricName, tags, fields, l.Now()) + if err != nil { + return nil, err + } + return m, nil +} + +// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine. +func (l *Parser) SetDefaultTags(tags map[string]string) { + l.DefaultTags = tags +} diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go new file mode 100644 index 0000000000000..4bbb9784a4e10 --- /dev/null +++ b/plugins/parsers/logfmt/parser_test.go @@ -0,0 +1,130 @@ +package logfmt + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric { + t.Helper() + v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time) + if err != nil { + t.Fatal(err) + } + return v +} + +func TestParse(t *testing.T) { + tests := []struct { + name string + measurement string + now func() time.Time + bytes []byte + want []testutil.Metric + wantErr bool + }{ + { + name: "no bytes returns no metrics", + now: func() time.Time { return time.Unix(0, 0) }, + want: []testutil.Metric{}, + }, + { + name: "logfmt parser returns all the fields", + bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "lvl": "info", + "msg": "http request", + "method": "POST", + "ts": "2018-07-24T19:43:40.275Z", + }, + Time: time.Unix(0, 0), + }, + }, + }, + { + name: "poorly formatted logfmt returns error", + now: func() time.Time { return time.Unix(0, 0) }, + bytes: []byte(`i am garbage data.`), + want: []testutil.Metric{}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := Parser{ + MetricName: tt.measurement, + Now: tt.now, + } + got, err := l.Parse(tt.bytes) + if (err != nil) != tt.wantErr { + t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) + return + } + for i, m := range got { + testutil.MustEqual(t, m, tt.want[i]) + } + }) + } +} + +func TestParseLine(t *testing.T) { + tests := []struct { + name string + s string + measurement string + now func() time.Time + want testutil.Metric + wantErr bool + }{ + { + name: "test something", + now: func() time.Time { return time.Unix(0, 0) }, + want: testutil.Metric{ + Tags: map[string]string{}, + Fields: map[string]interface{}{}, + Time: time.Unix(0, 0), + }, + }, + { + name: "log parser fmt returns all fields", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, + want: testutil.Metric{ + Measurement: "testlog", + Fields: map[string]interface{}{ + "ts": "2018-07-24T19:43:35.207268Z", + "lvl": int64(5), + "msg": "Write failed", + "log_id": "09R4e4Rl000", + }, + Tags: map[string]string{}, + Time: time.Unix(0, 0), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := Parser{ + MetricName: tt.measurement, + Now: tt.now, + } + got, err := l.ParseLine(tt.s) + if (err != nil) != tt.wantErr { + t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) + return + } + testutil.MustEqual(t, got, tt.want) + }) + } +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 24e73d4b63ca6..43795ca193aa6 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/value" ) @@ -139,6 +140,8 @@ func NewParser(config *Config) (Parser, error) { config.GrokCustomPatterns, config.GrokCustomPatternFiles, config.GrokTimeZone) + case "logfmt": + parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -238,3 +241,8 @@ func NewDropwizardParser( } return parser, err } + +// NewLogFmtParser returns a logfmt parser with the default options. +func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) { + return logfmt.NewParser(metricName, defaultTags), nil +} From 4ccfea3838c15fc587491f4364b5fdb1a1de3c77 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Tue, 31 Jul 2018 16:00:02 -0600 Subject: [PATCH 02/21] add applyDefaultTags function so that tags will be set at the end of a parse --- plugins/parsers/logfmt/parser.go | 35 +++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 5d19e4b3982d6..526308f5a2431 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -30,18 +30,13 @@ func NewParser(metricName string, defaultTags map[string]string) *Parser { } // Parse converts a slice of bytes in logfmt format to metrics. -func (l *Parser) Parse(b []byte) ([]telegraf.Metric, error) { +func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { reader := bytes.NewReader(b) decoder := glogfmt.NewDecoder(reader) metrics := make([]telegraf.Metric, 0) for decoder.ScanRecord() { tags := make(map[string]string) fields := make(map[string]interface{}) - //add default tags - for k, v := range l.DefaultTags { - tags[k] = v - } - for decoder.ScanKeyval() { if string(decoder.Value()) == "" { return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) @@ -59,17 +54,19 @@ func (l *Parser) Parse(b []byte) ([]telegraf.Metric, error) { fields[string(decoder.Key())] = value } } - m, err := metric.New(l.MetricName, tags, fields, l.Now()) + m, err := metric.New(p.MetricName, tags, fields, p.Now()) if err != nil { return nil, err } metrics = append(metrics, m) } + //add default tags + p.applyDefaultTags(metrics) return metrics, nil } // ParseLine converts a single line of text in logfmt to metrics. -func (l *Parser) ParseLine(s string) (telegraf.Metric, error) { +func (p *Parser) ParseLine(s string) (telegraf.Metric, error) { reader := strings.NewReader(s) decoder := glogfmt.NewDecoder(reader) @@ -77,7 +74,7 @@ func (l *Parser) ParseLine(s string) (telegraf.Metric, error) { tags := make(map[string]string) fields := make(map[string]interface{}) //add default tags - for k, v := range l.DefaultTags { + for k, v := range p.DefaultTags { tags[k] = v } @@ -97,7 +94,7 @@ func (l *Parser) ParseLine(s string) (telegraf.Metric, error) { fields[string(decoder.Key())] = value } } - m, err := metric.New(l.MetricName, tags, fields, l.Now()) + m, err := metric.New(p.MetricName, tags, fields, p.Now()) if err != nil { return nil, err } @@ -105,6 +102,20 @@ func (l *Parser) ParseLine(s string) (telegraf.Metric, error) { } // SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine. -func (l *Parser) SetDefaultTags(tags map[string]string) { - l.DefaultTags = tags +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + +func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) { + if len(p.DefaultTags) == 0 { + return + } + + for _, m := range metrics { + for k, v := range p.DefaultTags { + if !m.HasTag(k) { + m.AddTag(k, v) + } + } + } } From 89f783e44098078071481f825338cbf7bd4230b6 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Tue, 31 Jul 2018 18:08:10 -0600 Subject: [PATCH 03/21] Reformat the ParseLine function so that there is less duplicate code between Parse and ParseLine. --- plugins/parsers/logfmt/parser.go | 41 +++++++++----------------------- 1 file changed, 11 insertions(+), 30 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 526308f5a2431..f40d7f129d277 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -3,9 +3,9 @@ package logfmt import ( "bytes" + "errors" "fmt" "strconv" - "strings" "time" glogfmt "github.com/go-logfmt/logfmt" @@ -13,6 +13,10 @@ import ( "github.com/influxdata/telegraf/metric" ) +var ( + ErrNoMetric = errors.New("no metric in line") +) + // Parser decodes logfmt formatted messages into metrics. type Parser struct { MetricName string @@ -67,38 +71,15 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { // ParseLine converts a single line of text in logfmt to metrics. func (p *Parser) ParseLine(s string) (telegraf.Metric, error) { - reader := strings.NewReader(s) - decoder := glogfmt.NewDecoder(reader) - - decoder.ScanRecord() - tags := make(map[string]string) - fields := make(map[string]interface{}) - //add default tags - for k, v := range p.DefaultTags { - tags[k] = v - } - - for decoder.ScanKeyval() { - if string(decoder.Value()) == "" { - return nil, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) - } - //attempt type conversions - value := string(decoder.Value()) - if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { - fields[string(decoder.Key())] = iValue - } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { - fields[string(decoder.Key())] = fValue - } else if bValue, err := strconv.ParseBool(value); err == nil { - fields[string(decoder.Key())] = bValue - } else { - fields[string(decoder.Key())] = value - } - } - m, err := metric.New(p.MetricName, tags, fields, p.Now()) + metrics, err := p.Parse([]byte(s)) if err != nil { return nil, err } - return m, nil + + if len(metrics) < 1 { + return nil, ErrNoMetric + } + return metrics[0], nil } // SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine. From 11e95872a1a0d978ab52e40c869ef89c7bc819f0 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Wed, 1 Aug 2018 14:01:26 -0600 Subject: [PATCH 04/21] Rename test --- plugins/parsers/logfmt/parser_test.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index 4bbb9784a4e10..af306a3cb1ac0 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -87,16 +87,13 @@ func TestParseLine(t *testing.T) { wantErr bool }{ { - name: "test something", - now: func() time.Time { return time.Unix(0, 0) }, - want: testutil.Metric{ - Tags: map[string]string{}, - Fields: map[string]interface{}{}, - Time: time.Unix(0, 0), - }, + name: " No Metric In line", + now: func() time.Time { return time.Unix(0, 0) }, + want: testutil.Metric{}, + wantErr: true, }, { - name: "log parser fmt returns all fields", + name: "Log parser fmt returns all fields", now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, From 1edf8ccf52d4115c90280afb14cafe8da73718cc Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Wed, 1 Aug 2018 17:13:55 -0600 Subject: [PATCH 05/21] ParseLine should fail if there are no metrics to Parse --- plugins/parsers/logfmt/parser.go | 3 +-- plugins/parsers/logfmt/parser_test.go | 9 +++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index f40d7f129d277..2b536b932c8e8 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -3,7 +3,6 @@ package logfmt import ( "bytes" - "errors" "fmt" "strconv" "time" @@ -14,7 +13,7 @@ import ( ) var ( - ErrNoMetric = errors.New("no metric in line") + ErrNoMetric = fmt.Errorf("no metric in line") ) // Parser decodes logfmt formatted messages into metrics. diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index af306a3cb1ac0..01e086837712f 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -87,7 +87,7 @@ func TestParseLine(t *testing.T) { wantErr bool }{ { - name: " No Metric In line", + name: "No Metric In line", now: func() time.Time { return time.Unix(0, 0) }, want: testutil.Metric{}, wantErr: true, @@ -118,10 +118,11 @@ func TestParseLine(t *testing.T) { } got, err := l.ParseLine(tt.s) if (err != nil) != tt.wantErr { - t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) - return + t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) + } + if got != nil { + testutil.MustEqual(t, got, tt.want) } - testutil.MustEqual(t, got, tt.want) }) } } From 8ec40fb8b1896c31a71c45e296684e016a1d4426 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Mon, 6 Aug 2018 13:07:42 -0600 Subject: [PATCH 06/21] WIP: support parsing of multiple lines --- plugins/parsers/logfmt/parser.go | 31 +++++++++++++--- plugins/parsers/logfmt/parser_test.go | 51 ++++++++++++++++++++++++--- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 2b536b932c8e8..ed99026cefc8f 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -1,9 +1,10 @@ -// Package logfmt converts logfmt data into metrics. +// Package logfmt converts logfmt data into metrics. New comment package logfmt import ( "bytes" "fmt" + "log" "strconv" "time" @@ -37,9 +38,11 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { reader := bytes.NewReader(b) decoder := glogfmt.NewDecoder(reader) metrics := make([]telegraf.Metric, 0) + tags := make(map[string]string) + fields := make(map[string]interface{}) for decoder.ScanRecord() { - tags := make(map[string]string) - fields := make(map[string]interface{}) + //tags := make(map[string]string) + //fields := make(map[string]interface{}) for decoder.ScanKeyval() { if string(decoder.Value()) == "" { return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) @@ -48,22 +51,40 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { //attempt type conversions value := string(decoder.Value()) if iValue, err := strconv.Atoi(value); err == nil { + //log.Printf("Print Atoi Value Here:", iValue) + //log.Printf("DECODER =", decoder.Key()) fields[string(decoder.Key())] = iValue } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + log.Printf("key:%s, value:%s", decoder.Key(), value) + //log.Printf("Print ParseFloat Value Here:", fValue) fields[string(decoder.Key())] = fValue } else if bValue, err := strconv.ParseBool(value); err == nil { + //log.Printf("Print ParseBool Value Here:", bValue) fields[string(decoder.Key())] = bValue } else { + log.Printf("key:%s, value:%s", decoder.Key(), value) + // log.Printf("Print Value Here:", value) fields[string(decoder.Key())] = value + //log.Printf("DECODER =", decoder.Key()) } } - m, err := metric.New(p.MetricName, tags, fields, p.Now()) - if err != nil { + log.Printf("All fields: %s", fields) + //m, err := metric.New(p.MetricName, tags, fields, p.Now()) + //log.Printf("Return all the info in metric", p.MetricName, tags, fields) + /* if err != nil { + log.Println("Error occurred") return nil, err } metrics = append(metrics, m) + log.Printf("The final appended metrics %s", metrics) */ + } + m, err := metric.New(p.MetricName, tags, fields, p.Now()) + if err != nil { + log.Println("Error occurred") + return nil, err } //add default tags + metrics = append(metrics, m) p.applyDefaultTags(metrics) return metrics, nil } diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index 01e086837712f..dbf43c80058d2 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -27,7 +27,7 @@ func TestParse(t *testing.T) { want []testutil.Metric wantErr bool }{ - { + /*{ name: "no bytes returns no metrics", now: func() time.Time { return time.Unix(0, 0) }, want: []testutil.Metric{}, @@ -50,14 +50,36 @@ func TestParse(t *testing.T) { Time: time.Unix(0, 0), }, }, - }, + },*/ { + name: "logfmt parsers every line", + bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "lvl": "info", + "msg": "http request", + "method": "POST", + "ts": "2018-07-24T19:43:40.275Z", + "parent_id": "088876RL000", + "duration": "7.45", + "log_id": "09R4e4Rl000", + }, + Time: time.Unix(0, 0), + }, + }, + }, + /*{ name: "poorly formatted logfmt returns error", now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`i am garbage data.`), want: []testutil.Metric{}, wantErr: true, - }, + },*/ } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -72,12 +94,13 @@ func TestParse(t *testing.T) { } for i, m := range got { testutil.MustEqual(t, m, tt.want[i]) + //log.Printf("Are they equal", t, tt.want[i], m) } }) } } -func TestParseLine(t *testing.T) { +/*func TestParseLine(t *testing.T) { tests := []struct { name string s string @@ -109,6 +132,24 @@ func TestParseLine(t *testing.T) { Time: time.Unix(0, 0), }, }, + { + name: "Log parser only returns metrics from first string", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000 "/n" + method=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000`, + want: testutil.Metric{ + Measurement: "testlog", + Fields: map[string]interface{}{ + "ts": "2018-07-24T19:43:35.207268Z", + "lvl": int64(5), + "msg": "Write failed", + "log_id": "09R4e4Rl000", + }, + Tags: map[string]string{}, + Time: time.Unix(0, 0), + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -125,4 +166,4 @@ func TestParseLine(t *testing.T) { } }) } -} +}*/ From 4eeff2f495e16b5aa877244a37726c345e57cd30 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Mon, 6 Aug 2018 23:00:23 -0600 Subject: [PATCH 07/21] Add counter to check for empty metrics --- plugins/parsers/logfmt/parser.go | 19 ++++++++++++------- plugins/parsers/logfmt/parser_test.go | 16 +++++++++------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index ed99026cefc8f..b97e8626fe5a0 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -40,7 +40,9 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { metrics := make([]telegraf.Metric, 0) tags := make(map[string]string) fields := make(map[string]interface{}) + counter := 0 for decoder.ScanRecord() { + counter++ //tags := make(map[string]string) //fields := make(map[string]interface{}) for decoder.ScanKeyval() { @@ -78,14 +80,16 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { metrics = append(metrics, m) log.Printf("The final appended metrics %s", metrics) */ } - m, err := metric.New(p.MetricName, tags, fields, p.Now()) - if err != nil { - log.Println("Error occurred") - return nil, err + if counter > 0 { + m, err := metric.New(p.MetricName, tags, fields, p.Now()) + if err != nil { + log.Println("Error occurred") + return nil, err + } + //add default tags + metrics = append(metrics, m) + p.applyDefaultTags(metrics) } - //add default tags - metrics = append(metrics, m) - p.applyDefaultTags(metrics) return metrics, nil } @@ -97,6 +101,7 @@ func (p *Parser) ParseLine(s string) (telegraf.Metric, error) { } if len(metrics) < 1 { + //if metrics[1] == nil { return nil, ErrNoMetric } return metrics[0], nil diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index dbf43c80058d2..44aa7f17128aa 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" ) func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric { @@ -27,7 +28,7 @@ func TestParse(t *testing.T) { want []testutil.Metric wantErr bool }{ - /*{ + { name: "no bytes returns no metrics", now: func() time.Time { return time.Unix(0, 0) }, want: []testutil.Metric{}, @@ -50,7 +51,7 @@ func TestParse(t *testing.T) { Time: time.Unix(0, 0), }, }, - },*/ + }, { name: "logfmt parsers every line", bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), @@ -66,20 +67,20 @@ func TestParse(t *testing.T) { "method": "POST", "ts": "2018-07-24T19:43:40.275Z", "parent_id": "088876RL000", - "duration": "7.45", + "duration": 7.45, "log_id": "09R4e4Rl000", }, Time: time.Unix(0, 0), }, }, }, - /*{ + { name: "poorly formatted logfmt returns error", now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`i am garbage data.`), want: []testutil.Metric{}, wantErr: true, - },*/ + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -92,6 +93,7 @@ func TestParse(t *testing.T) { t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) return } + require.Equal(t, len(tt.want), len(got)) for i, m := range got { testutil.MustEqual(t, m, tt.want[i]) //log.Printf("Are they equal", t, tt.want[i], m) @@ -100,7 +102,7 @@ func TestParse(t *testing.T) { } } -/*func TestParseLine(t *testing.T) { +func TestParseLine(t *testing.T) { tests := []struct { name string s string @@ -166,4 +168,4 @@ func TestParse(t *testing.T) { } }) } -}*/ +} From 0589d50227c8bb475b80ee9f5cb95de2ddb28461 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Tue, 7 Aug 2018 17:47:37 -0600 Subject: [PATCH 08/21] Test with and without trailing end --- plugins/parsers/logfmt/parser_test.go | 32 ++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index 44aa7f17128aa..21406d4c51a28 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -33,6 +33,36 @@ func TestParse(t *testing.T) { now: func() time.Time { return time.Unix(0, 0) }, want: []testutil.Metric{}, }, + /*{ + name: "test without trailing end" + bytes: []byte("foo=\"bar\"") + now func() time.Time { return time.Unix(0,0) }, + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "foo": "bar", + }, + Time: time.Unix(0,0), + }, + }, + }, + { + name: "test with trailing end" + bytes: []byte("foo=\"bar\"\n") + now func() time.Time { return time.Unix(0,0) }, + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "foo": "bar", + }, + Time: time.Unix(0,0), + }, + }, + },*/ { name: "logfmt parser returns all the fields", bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), @@ -53,7 +83,7 @@ func TestParse(t *testing.T) { }, }, { - name: "logfmt parsers every line", + name: "logfmt parses every line", bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", From a049014034886b0b70ba01313f653f88ba64c38b Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Wed, 8 Aug 2018 13:34:41 -0600 Subject: [PATCH 09/21] WIP: reformat code to fix No Metric In line and no bytes returns no metrics tests --- plugins/parsers/logfmt/parser.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index b97e8626fe5a0..412a88692b17d 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -38,13 +38,9 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { reader := bytes.NewReader(b) decoder := glogfmt.NewDecoder(reader) metrics := make([]telegraf.Metric, 0) - tags := make(map[string]string) - fields := make(map[string]interface{}) - counter := 0 for decoder.ScanRecord() { - counter++ - //tags := make(map[string]string) - //fields := make(map[string]interface{}) + tags := make(map[string]string) + fields := make(map[string]interface{}) for decoder.ScanKeyval() { if string(decoder.Value()) == "" { return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) @@ -71,21 +67,13 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { } } log.Printf("All fields: %s", fields) - //m, err := metric.New(p.MetricName, tags, fields, p.Now()) - //log.Printf("Return all the info in metric", p.MetricName, tags, fields) - /* if err != nil { - log.Println("Error occurred") - return nil, err - } - metrics = append(metrics, m) - log.Printf("The final appended metrics %s", metrics) */ - } - if counter > 0 { m, err := metric.New(p.MetricName, tags, fields, p.Now()) + //log.Printf("Return all the info in metric", p.MetricName, tags, fields) if err != nil { log.Println("Error occurred") return nil, err } + //add default tags metrics = append(metrics, m) p.applyDefaultTags(metrics) From 4bfecde4e39caf085bf7b828bf6bed0bac144647 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Wed, 8 Aug 2018 13:49:18 -0600 Subject: [PATCH 10/21] Fix error so that logfmt parses every line test returns two Metrics --- plugins/parsers/logfmt/parser_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index 21406d4c51a28..f1f92fcae6682 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -92,10 +92,17 @@ func TestParse(t *testing.T) { Measurement: "testlog", Tags: map[string]string{}, Fields: map[string]interface{}{ - "lvl": "info", - "msg": "http request", - "method": "POST", - "ts": "2018-07-24T19:43:40.275Z", + "lvl": "info", + "msg": "http request", + "method": "POST", + "ts": "2018-07-24T19:43:40.275Z", + }, + Time: time.Unix(0, 0), + }, + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ "parent_id": "088876RL000", "duration": 7.45, "log_id": "09R4e4Rl000", From 44f9e0616cd9f1d094cabb13dba08d75ae3cdba3 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Wed, 8 Aug 2018 14:01:23 -0600 Subject: [PATCH 11/21] Fix syntax in Test with and without trailing end --- plugins/parsers/logfmt/parser_test.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index f1f92fcae6682..d10be2c993476 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -33,36 +33,38 @@ func TestParse(t *testing.T) { now: func() time.Time { return time.Unix(0, 0) }, want: []testutil.Metric{}, }, - /*{ - name: "test without trailing end" - bytes: []byte("foo=\"bar\"") - now func() time.Time { return time.Unix(0,0) }, + { + name: "test without trailing end", + bytes: []byte("foo=\"bar\""), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", want: []testutil.Metric{ testutil.Metric{ Measurement: "testlog", Tags: map[string]string{}, Fields: map[string]interface{}{ - "foo": "bar", + "foo": "bar", }, - Time: time.Unix(0,0), + Time: time.Unix(0, 0), }, }, }, { - name: "test with trailing end" - bytes: []byte("foo=\"bar\"\n") - now func() time.Time { return time.Unix(0,0) }, + name: "test with trailing end", + bytes: []byte("foo=\"bar\"\n"), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", want: []testutil.Metric{ testutil.Metric{ Measurement: "testlog", Tags: map[string]string{}, Fields: map[string]interface{}{ - "foo": "bar", + "foo": "bar", }, - Time: time.Unix(0,0), + Time: time.Unix(0, 0), }, }, - },*/ + }, { name: "logfmt parser returns all the fields", bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), From 995e0680df8c965da8cceaf9b17f0c416a0d92ad Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Thu, 9 Aug 2018 10:42:52 -0600 Subject: [PATCH 12/21] Add additional documentation for logfmt parser --- docs/DATA_FORMATS_INPUT.md | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 066db9c1292b6..a91ac040b3414 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -888,6 +888,22 @@ the file output will only print once per `flush_interval`. - Continue one token at a time until the entire line is successfully parsed. # Logfmt -For extracting key-value pairs from log text in the form `=`. -At the moment, the plugin will produce one metric per line and all keys +This parser implements the logfmt format by extracting key-value pairs from log text in the form `=`. +At the moment, the plugin will produce one metric per line and all keys are added as fields. Values are left as strings (for now). +A typical log +``` +method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z +connect=4ms service=8ms status=200 bytes=1653 +``` +will be converted into +``` +"method": "GET" +"host": "influxdata.org", +"ts": "2018-07-24T19:43:40.275Z", +"connect": "4ms", +"service": "8ms", +"status": 200, +"bytes": 1653, +``` +Additional information about the logfmt format can be found [here](https://brandur.org/logfmt). From 594402d6f23b0173746ad378ce8f85c274c3ca16 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Fri, 10 Aug 2018 09:36:27 -0600 Subject: [PATCH 13/21] Remove logs and unnecessary comments --- plugins/parsers/logfmt/parser.go | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 412a88692b17d..0b3522c3cb6cd 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -1,4 +1,3 @@ -// Package logfmt converts logfmt data into metrics. New comment package logfmt import ( @@ -46,42 +45,31 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) } - //attempt type conversions + //type conversions value := string(decoder.Value()) if iValue, err := strconv.Atoi(value); err == nil { - //log.Printf("Print Atoi Value Here:", iValue) - //log.Printf("DECODER =", decoder.Key()) fields[string(decoder.Key())] = iValue } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { - log.Printf("key:%s, value:%s", decoder.Key(), value) - //log.Printf("Print ParseFloat Value Here:", fValue) fields[string(decoder.Key())] = fValue } else if bValue, err := strconv.ParseBool(value); err == nil { - //log.Printf("Print ParseBool Value Here:", bValue) fields[string(decoder.Key())] = bValue } else { - log.Printf("key:%s, value:%s", decoder.Key(), value) - // log.Printf("Print Value Here:", value) fields[string(decoder.Key())] = value - //log.Printf("DECODER =", decoder.Key()) } } - log.Printf("All fields: %s", fields) m, err := metric.New(p.MetricName, tags, fields, p.Now()) - //log.Printf("Return all the info in metric", p.MetricName, tags, fields) if err != nil { log.Println("Error occurred") return nil, err } - //add default tags metrics = append(metrics, m) p.applyDefaultTags(metrics) } return metrics, nil } -// ParseLine converts a single line of text in logfmt to metrics. +// ParseLine converts a single line of text in logfmt format to metrics. func (p *Parser) ParseLine(s string) (telegraf.Metric, error) { metrics, err := p.Parse([]byte(s)) if err != nil { @@ -89,7 +77,6 @@ func (p *Parser) ParseLine(s string) (telegraf.Metric, error) { } if len(metrics) < 1 { - //if metrics[1] == nil { return nil, ErrNoMetric } return metrics[0], nil From b061a3e2def20d64f2a52da323545cae53e98011 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Fri, 10 Aug 2018 09:47:54 -0600 Subject: [PATCH 14/21] Fix parseline's multi parse line test --- plugins/parsers/logfmt/parser_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index d10be2c993476..a37cc1ef71c9c 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -177,8 +177,7 @@ func TestParseLine(t *testing.T) { name: "Log parser only returns metrics from first string", now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", - s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000 "/n" - method=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000`, + s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000", want: testutil.Metric{ Measurement: "testlog", Fields: map[string]interface{}{ From 40a6c4f9b3e1120f9533fbe5a1db1d206de1dd4f Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Fri, 10 Aug 2018 10:21:43 -0600 Subject: [PATCH 15/21] Use strconv.ParseInt for conversion --- plugins/parsers/logfmt/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 0b3522c3cb6cd..8e75eb9cbf5ef 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -47,7 +47,7 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { //type conversions value := string(decoder.Value()) - if iValue, err := strconv.Atoi(value); err == nil { + if iValue, err := strconv.ParseInt(value, 0, 64); err == nil { fields[string(decoder.Key())] = iValue } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { fields[string(decoder.Key())] = fValue From cd24af80d5199490d7ff302a94f70f8c8c5328de Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Fri, 10 Aug 2018 10:22:05 -0600 Subject: [PATCH 16/21] Reformat --- plugins/parsers/logfmt/parser_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index a37cc1ef71c9c..26a2ef2c5187d 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -85,7 +85,7 @@ func TestParse(t *testing.T) { }, }, { - name: "logfmt parses every line", + name: "logfmt parser parses every line", bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", @@ -135,7 +135,6 @@ func TestParse(t *testing.T) { require.Equal(t, len(tt.want), len(got)) for i, m := range got { testutil.MustEqual(t, m, tt.want[i]) - //log.Printf("Are they equal", t, tt.want[i], m) } }) } @@ -174,7 +173,7 @@ func TestParseLine(t *testing.T) { }, }, { - name: "Log parser only returns metrics from first string", + name: "ParseLine only returns metrics from first string", now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000", From 98b9b17efa197268b37d8ba7367cb1fe7b618af6 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Mon, 13 Aug 2018 09:43:16 -0600 Subject: [PATCH 17/21] Refactor --- plugins/parsers/logfmt/parser.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 8e75eb9cbf5ef..27eefa773aee1 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -7,7 +7,7 @@ import ( "strconv" "time" - glogfmt "github.com/go-logfmt/logfmt" + "github.com/go-logfmt/logfmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" ) @@ -35,7 +35,7 @@ func NewParser(metricName string, defaultTags map[string]string) *Parser { // Parse converts a slice of bytes in logfmt format to metrics. func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { reader := bytes.NewReader(b) - decoder := glogfmt.NewDecoder(reader) + decoder := logfmt.NewDecoder(reader) metrics := make([]telegraf.Metric, 0) for decoder.ScanRecord() { tags := make(map[string]string) @@ -47,7 +47,7 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { //type conversions value := string(decoder.Value()) - if iValue, err := strconv.ParseInt(value, 0, 64); err == nil { + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { fields[string(decoder.Key())] = iValue } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { fields[string(decoder.Key())] = fValue @@ -64,8 +64,8 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { } metrics = append(metrics, m) - p.applyDefaultTags(metrics) } + p.applyDefaultTags(metrics) return metrics, nil } From 8d31561e8dca9652c09da05a61a963f66f9f53aa Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Mon, 13 Aug 2018 09:47:59 -0600 Subject: [PATCH 18/21] update README for logfmt parser --- docs/DATA_FORMATS_INPUT.md | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index a91ac040b3414..db180cd6b074f 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -891,19 +891,21 @@ the file output will only print once per `flush_interval`. This parser implements the logfmt format by extracting key-value pairs from log text in the form `=`. At the moment, the plugin will produce one metric per line and all keys are added as fields. Values are left as strings (for now). -A typical log ``` -method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z -connect=4ms service=8ms status=200 bytes=1653 +[[inputs.exec]] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them [here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). + data_format = "logfmt" + ``` + +A typical log + +method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z connect=4ms service=8ms status=200 bytes=1653 ``` will be converted into ``` -"method": "GET" -"host": "influxdata.org", -"ts": "2018-07-24T19:43:40.275Z", -"connect": "4ms", -"service": "8ms", -"status": 200, -"bytes": 1653, +method=GET, host=influxdata.org, ts=2018-07-24T19:43:40.275Z, connect=4ms, service=8ms, status=200, bytes=1653, ``` Additional information about the logfmt format can be found [here](https://brandur.org/logfmt). From e75dec437410e219a34227f28500b24367549f97 Mon Sep 17 00:00:00 2001 From: Ayrdrie Palmer Date: Fri, 17 Aug 2018 08:19:19 -0600 Subject: [PATCH 19/21] Add requested changes --- docs/DATA_FORMATS_INPUT.md | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index db180cd6b074f..a6254e41d7918 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -888,24 +888,17 @@ the file output will only print once per `flush_interval`. - Continue one token at a time until the entire line is successfully parsed. # Logfmt -This parser implements the logfmt format by extracting key-value pairs from log text in the form `=`. +This parser implements the logfmt format by extracting and converting key-value pairs from log text in the form `=`. At the moment, the plugin will produce one metric per line and all keys -are added as fields. Values are left as strings (for now). -``` -[[inputs.exec]] - - ## Data format to consume. - ## Each data format has its own unique set of configuration options, read - ## more about them [here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). - data_format = "logfmt" - ``` - +are added as fields. A typical log - -method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z connect=4ms service=8ms status=200 bytes=1653 +``` +method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z +connect=4ms service=8ms status=200 bytes=1653 ``` will be converted into ``` -method=GET, host=influxdata.org, ts=2018-07-24T19:43:40.275Z, connect=4ms, service=8ms, status=200, bytes=1653, +logfmt method="GET",host="influxdata.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i + ``` Additional information about the logfmt format can be found [here](https://brandur.org/logfmt). From ff03ce2dbba07b59f18007e3dbd14a22b3ea02e2 Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 21 Aug 2018 14:45:05 -0700 Subject: [PATCH 20/21] better error messages --- plugins/parsers/logfmt/parser.go | 21 +++++++++++++------ plugins/parsers/logfmt/parser_test.go | 29 ++++++++++++++++++++++++--- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 27eefa773aee1..603dbbae862b9 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -3,7 +3,6 @@ package logfmt import ( "bytes" "fmt" - "log" "strconv" "time" @@ -37,12 +36,19 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { reader := bytes.NewReader(b) decoder := logfmt.NewDecoder(reader) metrics := make([]telegraf.Metric, 0) - for decoder.ScanRecord() { - tags := make(map[string]string) + for { + ok := decoder.ScanRecord() + if !ok { + err := decoder.Err() + if err != nil { + return nil, err + } + break + } fields := make(map[string]interface{}) for decoder.ScanKeyval() { if string(decoder.Value()) == "" { - return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) + continue } //type conversions @@ -57,9 +63,12 @@ func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { fields[string(decoder.Key())] = value } } - m, err := metric.New(p.MetricName, tags, fields, p.Now()) + if len(fields) == 0 { + continue + } + + m, err := metric.New(p.MetricName, map[string]string{}, fields, p.Now()) if err != nil { - log.Println("Error occurred") return nil, err } diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index 26a2ef2c5187d..c9096468467dc 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -114,11 +114,34 @@ func TestParse(t *testing.T) { }, }, { - name: "poorly formatted logfmt returns error", + name: "keys without = or values are ignored", now: func() time.Time { return time.Unix(0, 0) }, - bytes: []byte(`i am garbage data.`), + bytes: []byte(`i am no data.`), want: []testutil.Metric{}, - wantErr: true, + wantErr: false, + }, + { + name: "keys without values are ignored", + now: func() time.Time { return time.Unix(0, 0) }, + bytes: []byte(`foo="" bar=`), + want: []testutil.Metric{}, + wantErr: false, + }, + { + name: "unterminated quote produces error", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + bytes: []byte(`bar=baz foo="bar`), + want: []testutil.Metric{}, + wantErr: true, + }, + { + name: "malformed key", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + bytes: []byte(`"foo=" bar=baz`), + want: []testutil.Metric{}, + wantErr: true, }, } for _, tt := range tests { From a3f333118e65534362b866b7a0fa8a76f6db2fb9 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 21 Aug 2018 15:36:43 -0700 Subject: [PATCH 21/21] Fix conflict markers --- docs/DATA_FORMATS_INPUT.md | 3 --- plugins/parsers/registry.go | 5 ++--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index d519292124981..7f7c94930e1e6 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -10,11 +10,8 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Collectd](#collectd) 1. [Dropwizard](#dropwizard) 1. [Grok](#grok) -<<<<<<< HEAD 1. [Logfmt](#logfmt) -======= 1. [Wavefront](#wavefront) ->>>>>>> master Telegraf metrics, like InfluxDB [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 424ebddc838d5..e198cb2cb96c6 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -245,12 +245,11 @@ func NewDropwizardParser( return parser, err } -<<<<<<< HEAD // NewLogFmtParser returns a logfmt parser with the default options. func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) { return logfmt.NewParser(metricName, defaultTags), nil -======= +} + func NewWavefrontParser(defaultTags map[string]string) (Parser, error) { return wavefront.NewWavefrontParser(defaultTags), nil ->>>>>>> master }