Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parsing multiple metrics on the first line of tailed file #6289

Merged
merged 2 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 42 additions & 33 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"sync"

"github.com/influxdata/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
)

const (
Expand Down Expand Up @@ -172,55 +172,64 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {

// create a goroutine for each "tailer"
t.wg.Add(1)
go t.receiver(parser, tailer)
go func() {
defer t.wg.Done()
t.receiver(parser, tailer)
}()
t.tailers[tailer.Filename] = tailer
}
}
return nil
}

// this is launched as a goroutine to continuously watch a tailed logfile
// ParseLine parses a line of text.
func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.Metric, error) {
switch parser.(type) {
case *csv.Parser:
// The csv parser parses headers in Parse and skips them in ParseLine.
// As a temporary solution call Parse only when getting the first
// line from the file.
if firstLine {
return parser.Parse([]byte(line))
} else {
m, err := parser.ParseLine(line)
if err != nil {
return nil, err
}

if m != nil {
return []telegraf.Metric{m}, nil
}
return []telegraf.Metric{}, nil
}
default:
return parser.Parse([]byte(line))
}
}

// Receiver is launched as a goroutine to continuously watch a tailed logfile
// for changes, parse any incoming msgs, and add to the accumulator.
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
defer t.wg.Done()

var firstLine = true
var metrics []telegraf.Metric
var m telegraf.Metric
var err error
var line *tail.Line
for line = range tailer.Lines {
for line := range tailer.Lines {
if line.Err != nil {
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err))
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, line.Err))
continue
}
// Fix up files with Windows line endings.
text := strings.TrimRight(line.Text, "\r")

if firstLine {
metrics, err = parser.Parse([]byte(text))
if err == nil {
if len(metrics) == 0 {
firstLine = false
continue
} else {
m = metrics[0]
}
}
firstLine = false
} else {
m, err = parser.ParseLine(text)
}

if err == nil {
if m != nil {
tags := m.Tags()
tags["path"] = tailer.Filename
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
metrics, err := parseLine(parser, text, firstLine)
if err != nil {
t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s",
tailer.Filename, line.Text, err))
continue
}
firstLine = false

for _, metric := range metrics {
metric.AddTag("path", tailer.Filename)
t.acc.AddMetric(metric)
}
}

Expand Down
119 changes: 118 additions & 1 deletion plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"os"
"runtime"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -139,3 +142,117 @@ func TestTailDosLineendings(t *testing.T) {
"usage_idle": float64(200),
})
}

// The csv parser should only parse the header line once per file.
func TestCSVHeadersParsedOnce(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer func() {
tmpfile.Close()
os.Remove(tmpfile.Name())
}()

_, err = tmpfile.WriteString(`
measurement,time_idle
cpu,42
cpu,42
`)
require.NoError(t, err)

plugin := NewTail()
plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()}
plugin.SetParserFunc(func() (parsers.Parser, error) {
return &csv.Parser{
MeasurementColumn: "measurement",
HeaderRowCount: 1,
TimeFunc: func() time.Time { return time.Unix(0, 0) },
}, nil
})
defer plugin.Stop()

acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
plugin.Stop()

expected := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42,
"measurement": "cpu",
},
time.Unix(0, 0)),
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42,
"measurement": "cpu",
},
time.Unix(0, 0)),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

// Ensure that the first line can produce multiple metrics (#6138)
func TestMultipleMetricsOnFirstLine(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer func() {
tmpfile.Close()
os.Remove(tmpfile.Name())
}()

_, err = tmpfile.WriteString(`
[{"time_idle": 42}, {"time_idle": 42}]
`)
require.NoError(t, err)

plugin := NewTail()
plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()}
plugin.SetParserFunc(func() (parsers.Parser, error) {
return json.New(
&json.Config{
MetricName: "cpu",
})
})
defer plugin.Stop()

acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
err = plugin.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
plugin.Stop()

expected := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0)),
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0)),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(),
testutil.IgnoreTime())
}