From 03a0e4c1039e35482d061b84a4bf17a37683a72c Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Mon, 18 Jul 2016 14:44:25 +0100 Subject: [PATCH] Internally name all patterns for log parsing flexibility closes #1436 This also fixes the bad behavior of waiting until runtime to return log parsing pattern compile errors when a pattern was simply unfound. --- plugins/inputs/logparser/grok/grok.go | 12 ++++++- plugins/inputs/logparser/grok/grok_test.go | 39 ++++++++++++++++++++-- plugins/inputs/logparser/logparser.go | 21 ++++++------ 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go index 16e62b2231653..e5fe800626a9c 100644 --- a/plugins/inputs/logparser/grok/grok.go +++ b/plugins/inputs/logparser/grok/grok.go @@ -98,13 +98,23 @@ func (p *Parser) Compile() error { return err } - p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns + // Give Patterns fake names so that they can be treated as named + // "custom patterns" + for i, pattern := range p.Patterns { + name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i) + p.CustomPatterns += "\n" + name + " " + pattern + "\n" + p.Patterns[i] = "%{" + name + "}" + } + // Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse + // them together as the same type of pattern. + p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns if len(p.CustomPatterns) != 0 { scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns)) p.addCustomPatterns(scanner) } + // Parse any custom pattern files supplied. for _, filename := range p.CustomPatternFiles { file, err := os.Open(filename) if err != nil { diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go index 979553f88ac11..385a2fcd3be3e 100644 --- a/plugins/inputs/logparser/grok/grok_test.go +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -212,7 +212,7 @@ func TestBuiltinCombinedLogFormat(t *testing.T) { func TestCompileStringAndParse(t *testing.T) { p := &Parser{ - Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + Patterns: []string{"%{TEST_LOG_A}"}, CustomPatterns: ` DURATION %{NUMBER}[nuµm]?s RESPONSE_CODE %{NUMBER:response_code:tag} @@ -235,6 +235,41 @@ func TestCompileStringAndParse(t *testing.T) { assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags()) } +func TestCompileErrorsOnInvalidPattern(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"}, + CustomPatterns: ` + DURATION %{NUMBER}[nuµm]?s + RESPONSE_CODE %{NUMBER:response_code:tag} + RESPONSE_TIME %{DURATION:response_time:duration} + TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME} + `, + } + assert.Error(t, p.Compile()) + + metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`) + require.Nil(t, metricA) +} + +func TestParsePatternsWithoutCustom(t *testing.T) { + p := &Parser{ + Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"}, + } + assert.NoError(t, p.Compile()) + + metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`) + require.NotNil(t, metricA) + assert.NoError(t, err) + assert.Equal(t, + map[string]interface{}{ + "response_time": int64(20821), + "metric": float64(10890.645), + }, + metricA.Fields()) + assert.Equal(t, map[string]string{}, metricA.Tags()) + assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time()) +} + func TestParseEpochNano(t *testing.T) { p := &Parser{ Patterns: []string{"%{MYAPP}"}, @@ -418,7 +453,7 @@ func TestParseErrors(t *testing.T) { TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{} `, } - assert.NoError(t, p.Compile()) + assert.Error(t, p.Compile()) _, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`) assert.Error(t, err) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 4737ace6536d4..28eabcd8ec79c 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -9,6 +9,7 @@ import ( "github.com/hpcloud/tail" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" @@ -110,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { } // compile log parser patterns: + errChan := errchan.New(len(l.parsers)) for _, parser := range l.parsers { if err := parser.Compile(); err != nil { - return err + errChan.C <- err } } + if err := errChan.Error(); err != nil { + return err + } var seek tail.SeekInfo if !l.FromBeginning { @@ -125,12 +130,13 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { l.wg.Add(1) go l.parser() - var errS string + errChan := errchan.New(len(l.Files)) // Create a "tailer" for each file for _, filepath := range l.Files { g, err := globpath.Compile(filepath) if err != nil { log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + continue } for file, _ := range g.Match() { tailer, err := tail.TailFile(file, @@ -139,10 +145,8 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { Follow: true, Location: &seek, }) - if err != nil { - errS += err.Error() + " " - continue - } + errChan.C <- err + // create a goroutine for each "tailer" l.wg.Add(1) go l.receiver(tailer) @@ -150,10 +154,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { } } - if errS != "" { - return fmt.Errorf(errS) - } - return nil + return errChan.Error() } // receiver is launched as a goroutine to continuously watch a tailed logfile