From 841966e242279f99f1b2aa17ab8e50f5dac6eb86 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Wed, 16 Jun 2021 11:44:31 -0400 Subject: [PATCH] Fix issue where log entry could be duplicated on parse error (#188) In some circumstances, a log entry could be unsuccessfully parsed. It is intended that a failure can result in an entry being "sent" to the next operator, but when this happens, the parser should cease execution immediately. Prior to this change, this was not always the case, as some parsers would continue on and "send" the entry again. --- operator/builtin/parser/csv/csv_test.go | 4 +- operator/helper/parser_test.go | 108 +++++++++++++++++------- operator/helper/transformer.go | 1 - operator/helper/transformer_test.go | 66 ++++++++------- testutil/mocks.go | 10 +++ 5 files changed, 126 insertions(+), 63 deletions(-) diff --git a/operator/builtin/parser/csv/csv_test.go b/operator/builtin/parser/csv/csv_test.go index ed52e74b7afa..3400896cd950 100644 --- a/operator/builtin/parser/csv/csv_test.go +++ b/operator/builtin/parser/csv/csv_test.go @@ -234,13 +234,13 @@ func TestParserCSVMultipleBodys(t *testing.T) { entry := entry.New() entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent" err = op.Process(context.Background(), entry) - require.Nil(t, err, "Expected to parse a single csv record, got '2'") require.NoError(t, err) fake.ExpectBody(t, map[string]interface{}{ "name": "stanza", "sev": "INFO", "msg": "started agent", }) + fake.ExpectNoEntry(t, 100*time.Millisecond) }) } @@ -260,7 +260,7 @@ func TestParserCSVInvalidJSONInput(t *testing.T) { entry := entry.New() entry.Body = "{\"name\": \"stanza\"}" err = op.Process(context.Background(), entry) - require.Nil(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") + require.Error(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") fake.ExpectBody(t, "{\"name\": \"stanza\"}") }) } diff --git a/operator/helper/parser_test.go b/operator/helper/parser_test.go index eea59aa87d29..8fdb1593c83e 100644 --- a/operator/helper/parser_test.go +++ b/operator/helper/parser_test.go @@ -89,18 +89,12 @@ func TestParserMissingField(t *testing.T) { require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.") } -func TestParserInvalidParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: DropOnError, }, ParseFrom: entry.NewBodyField(), } @@ -112,20 +106,36 @@ func TestParserInvalidParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "parse failure") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } -func TestParserInvalidTimeParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidParseSend(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: SendOnError, + }, + ParseFrom: entry.NewBodyField(), + } + parse := func(i interface{}) (interface{}, error) { + return i, fmt.Errorf("parse failure") + } + ctx := context.Background() + testEntry := entry.New() + err := parser.ProcessWith(ctx, testEntry, parse) + require.Error(t, err) + require.Contains(t, err.Error(), "parse failure") + fakeOut.ExpectEntry(t, testEntry) + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) +} + +func TestParserInvalidTimeParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) + parser := ParserOperator{ + TransformerOperator: TransformerOperator{ + WriterOperator: *writer, + OnError: DropOnError, }, ParseFrom: entry.NewBodyField(), ParseTo: entry.NewBodyField(), @@ -144,20 +154,42 @@ func TestParserInvalidTimeParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } -func TestParserInvalidSeverityParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidTimeParseSend(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: SendOnError, + }, + ParseFrom: entry.NewBodyField(), + ParseTo: entry.NewBodyField(), + TimeParser: &TimeParser{ + ParseFrom: func() *entry.Field { + f := entry.NewBodyField("missing-key") + return &f + }(), + }, + } + parse := func(i interface{}) (interface{}, error) { + return i, nil + } + ctx := context.Background() + testEntry := entry.New() + err := parser.ProcessWith(ctx, testEntry, parse) + require.Error(t, err) + require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + fakeOut.ExpectEntry(t, testEntry) + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) +} +func TestParserInvalidSeverityParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) + parser := ParserOperator{ + TransformerOperator: TransformerOperator{ + WriterOperator: *writer, + OnError: DropOnError, }, SeverityParser: &SeverityParser{ ParseFrom: entry.NewBodyField("missing-key"), @@ -173,6 +205,7 @@ func TestParserInvalidSeverityParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } func TestParserInvalidTimeValidSeverityParse(t *testing.T) { @@ -457,3 +490,18 @@ func TestMapStructureDecodeParserConfig(t *testing.T) { require.NoError(t, err) require.Equal(t, except, actual) } + +func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) { + buildContext := testutil.NewBuildContext(t) + fakeOut := testutil.NewFakeOutput(t) + writer := &WriterOperator{ + BasicOperator: BasicOperator{ + OperatorID: "test-id", + OperatorType: "test-type", + SugaredLogger: buildContext.Logger.SugaredLogger, + }, + OutputIDs: []string{fakeOut.ID()}, + } + writer.SetOutputs([]operator.Operator{fakeOut}) + return writer, fakeOut +} diff --git a/operator/helper/transformer.go b/operator/helper/transformer.go index 61051df06be8..d492039cb1c7 100644 --- a/operator/helper/transformer.go +++ b/operator/helper/transformer.go @@ -111,7 +111,6 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry)) if t.OnError == SendOnError { t.Write(ctx, entry) - return nil } return err } diff --git a/operator/helper/transformer_test.go b/operator/helper/transformer_test.go index be01ff4d2e85..ea863d39eda8 100644 --- a/operator/helper/transformer_test.go +++ b/operator/helper/transformer_test.go @@ -122,7 +122,7 @@ func TestTransformerSendOnError(t *testing.T) { } err := transformer.ProcessWith(ctx, testEntry, transform) - require.NoError(t, err) + require.Error(t, err) output.AssertCalled(t, "Process", mock.Anything, mock.Anything) } @@ -156,46 +156,48 @@ func TestTransformerProcessWithValid(t *testing.T) { func TestTransformerIf(t *testing.T) { cases := []struct { - name string - ifExpr string - inputBody string - expected string + name string + ifExpr string + inputBody string + expected string + errExpected bool }{ { - "NoIf", - "", - "test", - "parsed", + name: "NoIf", + ifExpr: "", + inputBody: "test", + expected: "parsed", }, { - "TrueIf", - "true", - "test", - "parsed", + name: "TrueIf", + ifExpr: "true", + inputBody: "test", + expected: "parsed", }, { - "FalseIf", - "false", - "test", - "test", + name: "FalseIf", + ifExpr: "false", + inputBody: "test", + expected: "test", }, { - "EvaluatedTrue", - "$body == 'test'", - "test", - "parsed", + name: "EvaluatedTrue", + ifExpr: "$body == 'test'", + inputBody: "test", + expected: "parsed", }, { - "EvaluatedFalse", - "$body == 'notest'", - "test", - "test", + name: "EvaluatedFalse", + ifExpr: "$body == 'notest'", + inputBody: "test", + expected: "test", }, { - "FailingExpressionEvaluation", - "$body.test.noexist == 'notest'", - "test", - "test", + name: "FailingExpressionEvaluation", + ifExpr: "$body.test.noexist == 'notest'", + inputBody: "test", + expected: "test", + errExpected: true, }, } @@ -216,7 +218,11 @@ func TestTransformerIf(t *testing.T) { e.Body = "parsed" return nil }) - require.NoError(t, err) + if tc.errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + } fake.ExpectBody(t, tc.expected) }) diff --git a/testutil/mocks.go b/testutil/mocks.go index 8698167b3af0..6c31e390473b 100644 --- a/testutil/mocks.go +++ b/testutil/mocks.go @@ -110,3 +110,13 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) { require.FailNow(t, "Timed out waiting for entry") } } + +// ExpectNoEntry expects that no entry will be received within the specified time +func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) { + select { + case <-f.Received: + require.FailNow(t, "Should not have received entry") + case <-time.After(timeout): + return + } +}