From 49dedbe8ba8e7a3c9a5e75d4a18e519f6fc44903 Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Fri, 11 Oct 2024 20:02:05 +0530 Subject: [PATCH 01/16] [pkg/stanza] Use HandleEntryError for error handling --- .../operator/parser/container/parser.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index 5c33005435f4..aa2f50f9600a 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -17,6 +17,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + stanzaerr "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) @@ -73,7 +74,7 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { if format == "" { format, err = p.detectFormat(entry) if err != nil { - return fmt.Errorf("failed to detect a valid container log format: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "invalid container log format")) } } @@ -81,12 +82,12 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { case dockerFormat: err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleAttributeMappings) if err != nil { - return fmt.Errorf("failed to process the docker log: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "docker log")) } timeLayout = goTimeLayout err = parseTime(entry, timeLayout) if err != nil { - return fmt.Errorf("failed to parse time: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "parse time")) } case containerdFormat, crioFormat: p.criConsumerStartOnce.Do(func() { @@ -117,35 +118,35 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { // parse the message err = p.ParserOperator.ParseWith(ctx, entry, p.parseContainerd) if err != nil { - return fmt.Errorf("failed to parse containerd log: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "containerd log")) } timeLayout = goTimeLayout } else { // parse the message err = p.ParserOperator.ParseWith(ctx, entry, p.parseCRIO) if err != nil { - return fmt.Errorf("failed to parse crio log: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "parse crio log")) } timeLayout = crioTimeLayout } err = parseTime(entry, timeLayout) if err != nil { - return fmt.Errorf("failed to parse time: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "parse time")) } err = p.handleAttributeMappings(entry) if err != nil { - return fmt.Errorf("failed to handle attribute mappings: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "attribute mappings")) } // send it to the recombine operator err = p.recombineParser.Process(ctx, entry) if err != nil { - return fmt.Errorf("failed to recombine the crio log: %w", err) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "recombine the crio log")) } default: - return fmt.Errorf("failed to detect a valid container log format") + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "invalid container log format")) } return nil From 88b2d06166fc1262b35e5d7936621107bdc30c7f Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Tue, 15 Oct 2024 10:51:41 +0530 Subject: [PATCH 02/16] add test case for send_quiet --- .../operator/parser/container/parser_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/stanza/operator/parser/container/parser_test.go b/pkg/stanza/operator/parser/container/parser_test.go index 9c684e74d31a..62d73ff5d3c6 100644 --- a/pkg/stanza/operator/parser/container/parser_test.go +++ b/pkg/stanza/operator/parser/container/parser_test.go @@ -88,6 +88,24 @@ func TestInternalRecombineCfg(t *testing.T) { require.Equal(t, expected, cfg) } +func TestProcessOnErrorSendQuiet(t *testing.T) { + cfg := NewConfigWithID("test_id") + cfg.AddMetadataFromFilePath = false + cfg.OnError = "send_quiet" + set := componenttest.NewNopTelemetrySettings() + op, err := cfg.Build(set) + require.NoError(t, err, "incorrect build") + + // contains incorrect time format + faultyEntry := &entry.Entry{ + Body: `{"log":"INFO: log line here","stream":"stdout","time":"2023033"}`, + } + + err = op.Process(context.Background(), faultyEntry) + // Error is not logged + require.NoError(t, err, "send_quiet is not working properly") +} + func TestProcess(t *testing.T) { cases := []struct { name string From cd813f7808f9e251fe30bd285c71e46218860c4c Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Tue, 15 Oct 2024 10:59:14 +0530 Subject: [PATCH 03/16] cosmetic changes --- pkg/stanza/operator/parser/container/parser_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/stanza/operator/parser/container/parser_test.go b/pkg/stanza/operator/parser/container/parser_test.go index 62d73ff5d3c6..f28c5499c687 100644 --- a/pkg/stanza/operator/parser/container/parser_test.go +++ b/pkg/stanza/operator/parser/container/parser_test.go @@ -90,7 +90,6 @@ func TestInternalRecombineCfg(t *testing.T) { func TestProcessOnErrorSendQuiet(t *testing.T) { cfg := NewConfigWithID("test_id") - cfg.AddMetadataFromFilePath = false cfg.OnError = "send_quiet" set := componenttest.NewNopTelemetrySettings() op, err := cfg.Build(set) @@ -102,7 +101,7 @@ func TestProcessOnErrorSendQuiet(t *testing.T) { } err = op.Process(context.Background(), faultyEntry) - // Error is not logged + // Error should not be logged require.NoError(t, err, "send_quiet is not working properly") } From 5a2055c86c8a91940c8c7c96fd664c8fe2dfaaaa Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Tue, 15 Oct 2024 11:07:07 +0530 Subject: [PATCH 04/16] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b6252c925d4c..2268c2ec9d16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ arrow.waiter_limit -> admission.waiter_limit - `sqlqueryreceiver`: Fix reprocessing of logs when tracking_column type is timestamp (#35194) - `windowseventlogreceiver`: While collecting from a remote windows host, the stanza operator will no longer log "subscription handle is already open" constantly during successful collection. (#35520) - `windowseventlogreceiver`: If collecting from a remote host, the receiver will stop collecting if the host restarts. This change resubscribes when the host restarts. (#35175) +- `pkg/stanza`: Use HandleEntryError for error handling. (#35758) ## v0.110.0 From ae98ac64f9cadfb7dd679435533a024b496102a4 Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Tue, 15 Oct 2024 11:43:12 +0530 Subject: [PATCH 05/16] add changelog --- .chloggen/pkg-stanza-error-handling.yaml | 27 ++++++++++++++++++++++++ CHANGELOG.md | 1 - 2 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 .chloggen/pkg-stanza-error-handling.yaml diff --git a/.chloggen/pkg-stanza-error-handling.yaml b/.chloggen/pkg-stanza-error-handling.yaml new file mode 100644 index 000000000000..976d2e72654b --- /dev/null +++ b/.chloggen/pkg-stanza-error-handling.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: uses `HandleEntryError` method for error handling + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35726] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/CHANGELOG.md b/CHANGELOG.md index 2268c2ec9d16..b6252c925d4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,7 +95,6 @@ arrow.waiter_limit -> admission.waiter_limit - `sqlqueryreceiver`: Fix reprocessing of logs when tracking_column type is timestamp (#35194) - `windowseventlogreceiver`: While collecting from a remote windows host, the stanza operator will no longer log "subscription handle is already open" constantly during successful collection. (#35520) - `windowseventlogreceiver`: If collecting from a remote host, the receiver will stop collecting if the host restarts. This change resubscribes when the host restarts. (#35175) -- `pkg/stanza`: Use HandleEntryError for error handling. (#35758) ## v0.110.0 From b0c7a382d05f49aed2ad4706408f51ebd6667458 Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Tue, 15 Oct 2024 12:40:40 +0530 Subject: [PATCH 06/16] test reformat --- .../operator/parser/container/parser_test.go | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/pkg/stanza/operator/parser/container/parser_test.go b/pkg/stanza/operator/parser/container/parser_test.go index f28c5499c687..248e378bfd3b 100644 --- a/pkg/stanza/operator/parser/container/parser_test.go +++ b/pkg/stanza/operator/parser/container/parser_test.go @@ -5,6 +5,7 @@ package container import ( "context" + "fmt" "testing" "time" @@ -89,20 +90,32 @@ func TestInternalRecombineCfg(t *testing.T) { } func TestProcessOnErrorSendQuiet(t *testing.T) { - cfg := NewConfigWithID("test_id") - cfg.OnError = "send_quiet" - set := componenttest.NewNopTelemetrySettings() - op, err := cfg.Build(set) - require.NoError(t, err, "incorrect build") - // contains incorrect time format faultyEntry := &entry.Entry{ Body: `{"log":"INFO: log line here","stream":"stdout","time":"2023033"}`, } - err = op.Process(context.Background(), faultyEntry) - // Error should not be logged - require.NoError(t, err, "send_quiet is not working properly") + cfg := NewConfigWithID("test_id") + cfg.AddMetadataFromFilePath = false + set := componenttest.NewNopTelemetrySettings() + + t.Run("without send_quiet", func(t *testing.T) { + cfg.OnError = "send" + op, err := cfg.Build(set) + require.NoError(t, err, "incorrect build") + err = op.Process(context.Background(), faultyEntry) + fmt.Println(err) + require.ErrorContains(t, err, "parse time", "on_error is not working correctly") + }) + + t.Run("with send_quiet", func(t *testing.T) { + cfg.OnError = "send_quiet" + op, err := cfg.Build(set) + require.NoError(t, err, "incorrect build") + err = op.Process(context.Background(), faultyEntry) + require.NoError(t, err, "send_quiet is not working correctly") + }) + } func TestProcess(t *testing.T) { From 293f94c58411d4f1fb3002bf558678b1bf4e906c Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Tue, 15 Oct 2024 12:41:47 +0530 Subject: [PATCH 07/16] more test correction --- pkg/stanza/operator/parser/container/parser_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/stanza/operator/parser/container/parser_test.go b/pkg/stanza/operator/parser/container/parser_test.go index 248e378bfd3b..a4f5377f5827 100644 --- a/pkg/stanza/operator/parser/container/parser_test.go +++ b/pkg/stanza/operator/parser/container/parser_test.go @@ -5,7 +5,6 @@ package container import ( "context" - "fmt" "testing" "time" @@ -100,11 +99,9 @@ func TestProcessOnErrorSendQuiet(t *testing.T) { set := componenttest.NewNopTelemetrySettings() t.Run("without send_quiet", func(t *testing.T) { - cfg.OnError = "send" op, err := cfg.Build(set) require.NoError(t, err, "incorrect build") err = op.Process(context.Background(), faultyEntry) - fmt.Println(err) require.ErrorContains(t, err, "parse time", "on_error is not working correctly") }) From d8a33b390c41b1722a1999770d3af2029d3bed7e Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Mon, 21 Oct 2024 11:25:10 +0530 Subject: [PATCH 08/16] add unhandled error --- pkg/stanza/operator/helper/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stanza/operator/helper/parser.go b/pkg/stanza/operator/helper/parser.go index 56b2eaccd106..dd51ff17ce99 100644 --- a/pkg/stanza/operator/helper/parser.go +++ b/pkg/stanza/operator/helper/parser.go @@ -114,7 +114,7 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E if cb != nil { err = cb(entry) if err != nil { - return err + return p.HandleEntryError(ctx, entry, err) } } From 833fdfe5c8e1fe85746f7c5e4ec84d2ba7c9510f Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Tue, 22 Oct 2024 16:26:19 +0530 Subject: [PATCH 09/16] added parseWithCallback method --- pkg/stanza/operator/helper/parser.go | 23 ++++++++ .../operator/parser/container/parser.go | 58 +++++++++++-------- .../operator/parser/container/parser_test.go | 3 +- 3 files changed, 59 insertions(+), 25 deletions(-) diff --git a/pkg/stanza/operator/helper/parser.go b/pkg/stanza/operator/helper/parser.go index dd51ff17ce99..cd178be222ff 100644 --- a/pkg/stanza/operator/helper/parser.go +++ b/pkg/stanza/operator/helper/parser.go @@ -121,6 +121,29 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E return p.Write(ctx, entry) } +func (p *ParserOperator) ParseWithCallback(ctx context.Context, entry *entry.Entry, parse ParseFunction, cb func(*entry.Entry) error) error { + // Short circuit if the "if" condition does not match + skip, err := p.Skip(ctx, entry) + if err != nil { + return p.HandleEntryError(ctx, entry, err) + } + if skip { + return p.Write(ctx, entry) + } + + if err = p.ParseWith(ctx, entry, parse); err != nil { + return err + } + if cb != nil { + err = cb(entry) + if err != nil { + return p.HandleEntryError(ctx, entry, err) + } + } + + return nil +} + // ParseWith will process an entry's field with a parser function. func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) error { value, ok := entry.Get(p.ParseFrom) diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index aa2f50f9600a..c4b238b4a798 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -68,7 +68,6 @@ type Parser struct { // Process will parse an entry of Container logs func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { - var timeLayout string format := p.format if format == "" { @@ -80,15 +79,8 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { switch format { case dockerFormat: - err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleAttributeMappings) - if err != nil { - return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "docker log")) - } - timeLayout = goTimeLayout - err = parseTime(entry, timeLayout) - if err != nil { - return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "parse time")) - } + return p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.cbGoTimeLayout) + case containerdFormat, crioFormat: p.criConsumerStartOnce.Do(func() { err = p.criLogEmitter.Start(nil) @@ -116,28 +108,17 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { if format == containerdFormat { // parse the message - err = p.ParserOperator.ParseWith(ctx, entry, p.parseContainerd) + err = p.ParserOperator.ParseWithCallback(ctx, entry, p.parseContainerd, p.cbGoTimeLayout) if err != nil { return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "containerd log")) } - timeLayout = goTimeLayout + } else { // parse the message - err = p.ParserOperator.ParseWith(ctx, entry, p.parseCRIO) + err = p.ParserOperator.ParseWithCallback(ctx, entry, p.parseCRIO, p.cbCrioTimeLayout) if err != nil { return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "parse crio log")) } - timeLayout = crioTimeLayout - } - - err = parseTime(entry, timeLayout) - if err != nil { - return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "parse time")) - } - - err = p.handleAttributeMappings(entry) - if err != nil { - return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "attribute mappings")) } // send it to the recombine operator @@ -266,6 +247,35 @@ func (p *Parser) handleAttributeMappings(e *entry.Entry) error { return nil } +func (p *Parser) cbGoTimeLayout(e *entry.Entry) error { + timeLayout := goTimeLayout + err := parseTime(e, timeLayout) + if err != nil { + return fmt.Errorf("failed to parse time: %w", err) + } + + err = p.handleAttributeMappings(e) + if err != nil { + return fmt.Errorf("failed to handle attribute mappings: %w", err) + } + + return nil +} + +func (p *Parser) cbCrioTimeLayout(e *entry.Entry) error { + timeLayout := crioTimeLayout + err := parseTime(e, timeLayout) + if err != nil { + return fmt.Errorf("failed to parse time: %w", err) + } + + err = p.handleAttributeMappings(e) + if err != nil { + return fmt.Errorf("failed to handle attribute mappings: %w", err) + } + return nil +} + // handleMoveAttributes moves fields to final attributes func (p *Parser) handleMoveAttributes(e *entry.Entry) error { // move `log` to `body` explicitly first to avoid diff --git a/pkg/stanza/operator/parser/container/parser_test.go b/pkg/stanza/operator/parser/container/parser_test.go index a4f5377f5827..75a01e0a0f1c 100644 --- a/pkg/stanza/operator/parser/container/parser_test.go +++ b/pkg/stanza/operator/parser/container/parser_test.go @@ -93,6 +93,7 @@ func TestProcessOnErrorSendQuiet(t *testing.T) { faultyEntry := &entry.Entry{ Body: `{"log":"INFO: log line here","stream":"stdout","time":"2023033"}`, } + expectedError := "parsing time \"2023033\" as \"2006-01-02T15:04:05.999Z\": cannot parse \"033\" as \"-\"" cfg := NewConfigWithID("test_id") cfg.AddMetadataFromFilePath = false @@ -102,7 +103,7 @@ func TestProcessOnErrorSendQuiet(t *testing.T) { op, err := cfg.Build(set) require.NoError(t, err, "incorrect build") err = op.Process(context.Background(), faultyEntry) - require.ErrorContains(t, err, "parse time", "on_error is not working correctly") + require.ErrorContains(t, err, expectedError, "on_error is not working correctly") }) t.Run("with send_quiet", func(t *testing.T) { From 1abea54046837fe0edd6804bdd9524644c780a10 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 23 Oct 2024 09:28:41 +0530 Subject: [PATCH 10/16] Update pkg/stanza/operator/parser/container/parser.go Co-authored-by: Christos Markou --- pkg/stanza/operator/parser/container/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index c4b238b4a798..98e302b04f0c 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -124,7 +124,7 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { // send it to the recombine operator err = p.recombineParser.Process(ctx, entry) if err != nil { - return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "recombine the crio log")) + return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "recombine the cri log")) } default: return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "invalid container log format")) From c4d32ad682b24bdc4fe15c8484e1086053c25584 Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Wed, 23 Oct 2024 09:56:54 +0530 Subject: [PATCH 11/16] factoring --- pkg/stanza/operator/parser/container/parser.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index 98e302b04f0c..b086ae3fcc20 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -97,15 +97,6 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { p.asyncConsumerStarted = true }) - // Short circuit if the "if" condition does not match - skip, err := p.Skip(ctx, entry) - if err != nil { - return p.HandleEntryError(ctx, entry, err) - } - if skip { - return p.Write(ctx, entry) - } - if format == containerdFormat { // parse the message err = p.ParserOperator.ParseWithCallback(ctx, entry, p.parseContainerd, p.cbGoTimeLayout) @@ -248,8 +239,7 @@ func (p *Parser) handleAttributeMappings(e *entry.Entry) error { } func (p *Parser) cbGoTimeLayout(e *entry.Entry) error { - timeLayout := goTimeLayout - err := parseTime(e, timeLayout) + err := parseTime(e, goTimeLayout) if err != nil { return fmt.Errorf("failed to parse time: %w", err) } @@ -263,8 +253,7 @@ func (p *Parser) cbGoTimeLayout(e *entry.Entry) error { } func (p *Parser) cbCrioTimeLayout(e *entry.Entry) error { - timeLayout := crioTimeLayout - err := parseTime(e, timeLayout) + err := parseTime(e, crioTimeLayout) if err != nil { return fmt.Errorf("failed to parse time: %w", err) } From fcdcf19e6335ddeff8a3c1aba6321ab1748b352e Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 23 Oct 2024 13:29:08 +0530 Subject: [PATCH 12/16] Update pkg/stanza/operator/parser/container/parser.go Co-authored-by: Christos Markou --- pkg/stanza/operator/parser/container/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index b086ae3fcc20..862c05d7e6dd 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -253,7 +253,7 @@ func (p *Parser) cbGoTimeLayout(e *entry.Entry) error { } func (p *Parser) cbCrioTimeLayout(e *entry.Entry) error { - err := parseTime(e, crioTimeLayout) + err := parseTime(e, criTimeLayout) if err != nil { return fmt.Errorf("failed to parse time: %w", err) } From 59cd4197e77628b287084638bc27374d6aab35ec Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 23 Oct 2024 13:29:23 +0530 Subject: [PATCH 13/16] Update pkg/stanza/operator/parser/container/parser.go Co-authored-by: Christos Markou --- pkg/stanza/operator/parser/container/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index 862c05d7e6dd..63a45fb21df2 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -252,7 +252,7 @@ func (p *Parser) cbGoTimeLayout(e *entry.Entry) error { return nil } -func (p *Parser) cbCrioTimeLayout(e *entry.Entry) error { +func (p *Parser) cbCRITimeLayout(e *entry.Entry) error { err := parseTime(e, criTimeLayout) if err != nil { return fmt.Errorf("failed to parse time: %w", err) From 0a07cd1416ddfc315e2410ec589384c8019f233e Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Wed, 23 Oct 2024 13:37:27 +0530 Subject: [PATCH 14/16] refactoring 2 --- .../operator/parser/container/parser.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index 63a45fb21df2..5a39244b0313 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -106,7 +106,7 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { } else { // parse the message - err = p.ParserOperator.ParseWithCallback(ctx, entry, p.parseCRIO, p.cbCrioTimeLayout) + err = p.ParserOperator.ParseWithCallback(ctx, entry, p.parseCRIO, p.cbCRITimeLayout) if err != nil { return p.HandleEntryError(ctx, entry, stanzaerr.Wrap(err, "parse crio log")) } @@ -239,21 +239,16 @@ func (p *Parser) handleAttributeMappings(e *entry.Entry) error { } func (p *Parser) cbGoTimeLayout(e *entry.Entry) error { - err := parseTime(e, goTimeLayout) - if err != nil { - return fmt.Errorf("failed to parse time: %w", err) - } + return p.handleTimeAndAttributes(e, goTimeLayout) - err = p.handleAttributeMappings(e) - if err != nil { - return fmt.Errorf("failed to handle attribute mappings: %w", err) - } - - return nil } func (p *Parser) cbCRITimeLayout(e *entry.Entry) error { - err := parseTime(e, criTimeLayout) + return p.handleTimeAndAttributes(e, crioTimeLayout) +} + +func (p *Parser) handleTimeAndAttributes(e *entry.Entry, layout string) error { + err := parseTime(e, layout) if err != nil { return fmt.Errorf("failed to parse time: %w", err) } From 28874f5d596d52d3248f41380ed08794077fd56d Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Wed, 30 Oct 2024 18:07:49 +0530 Subject: [PATCH 15/16] parser.go --- pkg/stanza/operator/helper/parser.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/stanza/operator/helper/parser.go b/pkg/stanza/operator/helper/parser.go index 198b1375d626..760b249fee78 100644 --- a/pkg/stanza/operator/helper/parser.go +++ b/pkg/stanza/operator/helper/parser.go @@ -136,6 +136,9 @@ func (p *ParserOperator) ParseWithCallback(ctx context.Context, entry *entry.Ent } if err = p.ParseWith(ctx, entry, parse); err != nil { + if p.OnError == DropOnErrorQuiet || p.OnError == SendOnErrorQuiet { + return nil + } return err } if cb != nil { From f7c965c9fdbbab4ebbbe9d959c6b44c6a2aa3157 Mon Sep 17 00:00:00 2001 From: khushijain21 Date: Wed, 30 Oct 2024 18:10:23 +0530 Subject: [PATCH 16/16] changelog --- .chloggen/pkg-stanza-error-handling.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/pkg-stanza-error-handling.yaml b/.chloggen/pkg-stanza-error-handling.yaml index 976d2e72654b..574a1a34becb 100644 --- a/.chloggen/pkg-stanza-error-handling.yaml +++ b/.chloggen/pkg-stanza-error-handling.yaml @@ -7,7 +7,7 @@ change_type: bug_fix component: pkg/stanza # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: uses `HandleEntryError` method for error handling +note: Honor `send_quiet`/`drop_quiet` setting for errors occur in container operator # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [35726]