From 9a260a1016bebca06d1087791689a9de6a949e0c Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 14 Jan 2020 10:42:27 -0700 Subject: [PATCH 1/3] Handle error message in handleS3Objects function --- x-pack/filebeat/input/s3/input.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 3ef20ee1041..b0539d31de5 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -232,6 +232,7 @@ func (p *s3Input) Wait() { func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityTimeout int64, svcS3 s3iface.ClientAPI, svcSQS sqsiface.ClientAPI) { var wg sync.WaitGroup numMessages := len(messages) + p.logger.Debugf("Processing %v messages", numMessages) wg.Add(numMessages * 2) // process messages received from sqs @@ -257,7 +258,7 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w if err != nil { err = errors.Wrap(err, "handleS3Objects failed") p.logger.Error(err) - errC <- err + return } } @@ -408,7 +409,9 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC } return nil } else if err != nil { - return errors.Wrapf(err, "ReadString failed for %v", s3Info.key) + err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key) + s3Context.Fail(err) + return err } // create event per log line From f5e84828984e64e3cf2dba449a18b735de687ef8 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 14 Jan 2020 13:31:25 -0700 Subject: [PATCH 2/3] remove s3Context.Fail and use setError and done instead --- x-pack/filebeat/input/s3/input.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index b0539d31de5..eef16d918fa 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -252,6 +252,7 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w p.logger.Error(errors.Wrap(err, "handleSQSMessage failed")) return } + p.logger.Debugf("handleSQSMessage succeed and returned %v sets of S3 log info", len(s3Infos)) // read from s3 object and create event for each log line err = p.handleS3Objects(svcS3, s3Infos, errC) @@ -260,6 +261,7 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w p.logger.Error(err) return } + p.logger.Debugf("handleS3Objects succeed") } func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Message, queueURL string, visibilityTimeout int64, wg *sync.WaitGroup, errC chan error) { @@ -289,13 +291,14 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess } return case <-time.After(time.Duration(visibilityTimeout/2) * time.Second): + p.logger.Warn("Half of the set visibilityTimeout passed, visibility timeout needs to be updated") // If half of the set visibilityTimeout passed and this is // still ongoing, then change visibility timeout. err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { p.logger.Error(errors.Wrap(err, "change message visibility failed")) } - p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) + p.logger.Infof("Message visibility timeout updated to %v seconds", visibilityTimeout) } } } @@ -371,8 +374,11 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC // read from s3 object reader, err := p.newS3BucketReader(svc, s3Info) if err != nil { - return errors.Wrap(err, "newS3BucketReader failed") + err = errors.Wrap(err, "newS3BucketReader failed") + s3Context.setError(err) + return err } + if reader == nil { continue } @@ -383,7 +389,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context) if err != nil { err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } return nil @@ -404,13 +410,13 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err = p.forwardEvent(event) if err != nil { err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } return nil } else if err != nil { err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } @@ -420,7 +426,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err = p.forwardEvent(event) if err != nil { err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } } @@ -613,11 +619,6 @@ func s3ObjectHash(s3Info s3Info) string { return prefix[:10] } -func (c *s3Context) Fail(err error) { - c.setError(err) - c.done() -} - func (c *s3Context) setError(err error) { // only care about the last error for now // TODO: add "Typed" error to error for context From 40355e06eaf510f6e8b7586a2f9f02f15054871d Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 14 Jan 2020 13:33:55 -0700 Subject: [PATCH 3/3] Add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9786d3152d7..01077e5f5bf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -227,6 +227,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix SSL config in input.yml for Filebeat httpjson input in the MISP module. {pull}14767[14767] - Check content-type when creating new reader in s3 input. {pull}15252[15252] {issue}15225[15225] - Fix session reset detection and a crash in Netflow input. {pull}14904[14904] +- Handle errors in handleS3Objects function and add more debug messages. {pull}15545[15545] *Heartbeat*