diff --git a/plugins/processors/execd/execd.go b/plugins/processors/execd/execd.go index 7aeb285a44fc5..3d11bac4969fe 100644 --- a/plugins/processors/execd/execd.go +++ b/plugins/processors/execd/execd.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal/process" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers" ) @@ -117,6 +118,12 @@ func (e *Execd) Stop() error { } func (e *Execd) cmdReadOut(out io.Reader) { + // Prefer using the StreamParser when parsing influx format. + if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser { + e.cmdReadOutStream(out) + return + } + scanner := bufio.NewScanner(out) scanBuf := make([]byte, 4096) scanner.Buffer(scanBuf, 262144) @@ -137,6 +144,33 @@ func (e *Execd) cmdReadOut(out io.Reader) { } } +func (e *Execd) cmdReadOutStream(out io.Reader) { + parser := influx.NewStreamParser(out) + + for { + metric, err := parser.Next() + + if err != nil { + // Stop parsing when we've reached the end. + if err == influx.EOF { + break + } + + if parseErr, isParseError := err.(*influx.ParseError); isParseError { + // Continue past parse errors. + e.acc.AddError(parseErr) + continue + } + + // Stop reading on any non-recoverable error. + e.acc.AddError(err) + return + } + + e.acc.AddMetric(metric) + } +} + func (e *Execd) cmdReadErr(out io.Reader) { scanner := bufio.NewScanner(out) diff --git a/plugins/processors/execd/execd_test.go b/plugins/processors/execd/execd_test.go index 451669ec6a130..3cccc9fbb156e 100644 --- a/plugins/processors/execd/execd_test.go +++ b/plugins/processors/execd/execd_test.go @@ -79,6 +79,56 @@ func TestExternalProcessorWorks(t *testing.T) { } } +func TestParseLinesWithNewLines(t *testing.T) { + e := New() + e.Log = testutil.Logger{} + + exe, err := os.Executable() + require.NoError(t, err) + t.Log(exe) + e.Command = []string{exe, "-countmultiplier"} + e.RestartDelay = config.Duration(5 * time.Second) + + acc := &testutil.Accumulator{} + + require.NoError(t, e.Start(acc)) + + now := time.Now() + orig := now + + m, err := metric.New("test", + map[string]string{ + "author": "Mr. Gopher", + }, + map[string]interface{}{ + "phrase": "Gophers are amazing creatures.\nAbsolutely amazing.", + "count": 3, + }, + now) + + require.NoError(t, err) + + e.Add(m, acc) + + acc.Wait(1) + require.NoError(t, e.Stop()) + + processedMetric := acc.GetTelegrafMetrics()[0] + + expectedMetric := testutil.MustMetric("test", + map[string]string{ + "author": "Mr. Gopher", + }, + map[string]interface{}{ + "phrase": "Gophers are amazing creatures.\nAbsolutely amazing.", + "count": 6, + }, + orig, + ) + + testutil.RequireMetricEqual(t, expectedMetric, processedMetric) +} + var countmultiplier = flag.Bool("countmultiplier", false, "if true, act like line input program instead of test")