diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8bff85b9747..a4ead1451b3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -71,6 +71,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix SSL config in input.yml for Filebeat httpjson input in the MISP module. {pull}14767[14767] - Check content-type when creating new reader in s3 input. {pull}15252[15252] {issue}15225[15225] - Fix session reset detection and a crash in Netflow input. {pull}14904[14904] +- Handle errors in handleS3Objects function and add more debug messages for s3 input. {pull}15545[15545] - netflow: Allow for options templates without scope fields. {pull}15449[15449] - netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449] - netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449] diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 3ef20ee1041..eef16d918fa 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -232,6 +232,7 @@ func (p *s3Input) Wait() { func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityTimeout int64, svcS3 s3iface.ClientAPI, svcSQS sqsiface.ClientAPI) { var wg sync.WaitGroup numMessages := len(messages) + p.logger.Debugf("Processing %v messages", numMessages) wg.Add(numMessages * 2) // process messages received from sqs @@ -251,14 +252,16 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w p.logger.Error(errors.Wrap(err, "handleSQSMessage failed")) return } + p.logger.Debugf("handleSQSMessage succeed and returned %v sets of S3 log info", len(s3Infos)) // read from s3 object and create event for each log line err = p.handleS3Objects(svcS3, s3Infos, errC) if err != nil { err = errors.Wrap(err, "handleS3Objects failed") p.logger.Error(err) - errC <- err + return } + p.logger.Debugf("handleS3Objects succeed") } func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Message, queueURL string, visibilityTimeout int64, wg *sync.WaitGroup, errC chan error) { @@ -288,13 +291,14 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess } return case <-time.After(time.Duration(visibilityTimeout/2) * time.Second): + p.logger.Warn("Half of the set visibilityTimeout passed, visibility timeout needs to be updated") // If half of the set visibilityTimeout passed and this is // still ongoing, then change visibility timeout. err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { p.logger.Error(errors.Wrap(err, "change message visibility failed")) } - p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) + p.logger.Infof("Message visibility timeout updated to %v seconds", visibilityTimeout) } } } @@ -370,8 +374,11 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC // read from s3 object reader, err := p.newS3BucketReader(svc, s3Info) if err != nil { - return errors.Wrap(err, "newS3BucketReader failed") + err = errors.Wrap(err, "newS3BucketReader failed") + s3Context.setError(err) + return err } + if reader == nil { continue } @@ -382,7 +389,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context) if err != nil { err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } return nil @@ -403,12 +410,14 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err = p.forwardEvent(event) if err != nil { err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } return nil } else if err != nil { - return errors.Wrapf(err, "ReadString failed for %v", s3Info.key) + err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key) + s3Context.setError(err) + return err } // create event per log line @@ -417,7 +426,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err = p.forwardEvent(event) if err != nil { err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.Fail(err) + s3Context.setError(err) return err } } @@ -610,11 +619,6 @@ func s3ObjectHash(s3Info s3Info) string { return prefix[:10] } -func (c *s3Context) Fail(err error) { - c.setError(err) - c.done() -} - func (c *s3Context) setError(err error) { // only care about the last error for now // TODO: add "Typed" error to error for context