Skip to content

Commit

Permalink
Internally name all patterns for log parsing flexibility
Browse files Browse the repository at this point in the history
closes #1436

This also fixes the bad behavior of waiting until runtime to return log
parsing pattern compile errors when a pattern was simply unfound.
  • Loading branch information
sparrc committed Jul 18, 2016
1 parent 1c29657 commit 03a0e4c
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 13 deletions.
12 changes: 11 additions & 1 deletion plugins/inputs/logparser/grok/grok.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,23 @@ func (p *Parser) Compile() error {
return err
}

p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
for i, pattern := range p.Patterns {
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.Patterns[i] = "%{" + name + "}"
}

// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner)
}

// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles {
file, err := os.Open(filename)
if err != nil {
Expand Down
39 changes: 37 additions & 2 deletions plugins/inputs/logparser/grok/grok_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestBuiltinCombinedLogFormat(t *testing.T) {

func TestCompileStringAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
Patterns: []string{"%{TEST_LOG_A}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
Expand All @@ -235,6 +235,41 @@ func TestCompileStringAndParse(t *testing.T) {
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
}

func TestCompileErrorsOnInvalidPattern(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`,
}
assert.Error(t, p.Compile())

metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`)
require.Nil(t, metricA)
}

func TestParsePatternsWithoutCustom(t *testing.T) {
p := &Parser{
Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"},
}
assert.NoError(t, p.Compile())

metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time())
}

func TestParseEpochNano(t *testing.T) {
p := &Parser{
Patterns: []string{"%{MYAPP}"},
Expand Down Expand Up @@ -418,7 +453,7 @@ func TestParseErrors(t *testing.T) {
TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{}
`,
}
assert.NoError(t, p.Compile())
assert.Error(t, p.Compile())
_, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`)
assert.Error(t, err)

Expand Down
21 changes: 11 additions & 10 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/hpcloud/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"

Expand Down Expand Up @@ -110,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}

// compile log parser patterns:
errChan := errchan.New(len(l.parsers))
for _, parser := range l.parsers {
if err := parser.Compile(); err != nil {
return err
errChan.C <- err
}
}
if err := errChan.Error(); err != nil {
return err
}

var seek tail.SeekInfo
if !l.FromBeginning {
Expand All @@ -125,12 +130,13 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1)
go l.parser()

var errS string
errChan := errchan.New(len(l.Files))
// Create a "tailer" for each file
for _, filepath := range l.Files {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
continue
}
for file, _ := range g.Match() {
tailer, err := tail.TailFile(file,
Expand All @@ -139,21 +145,16 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
Follow: true,
Location: &seek,
})
if err != nil {
errS += err.Error() + " "
continue
}
errChan.C <- err

// create a goroutine for each "tailer"
l.wg.Add(1)
go l.receiver(tailer)
l.tailers = append(l.tailers, tailer)
}
}

if errS != "" {
return fmt.Errorf(errS)
}
return nil
return errChan.Error()
}

// receiver is launched as a goroutine to continuously watch a tailed logfile
Expand Down

0 comments on commit 03a0e4c

Please sign in to comment.