Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Handle error message in handleS3Objects function #15545

Merged
merged 4 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix SSL config in input.yml for Filebeat httpjson input in the MISP module. {pull}14767[14767]
- Check content-type when creating new reader in s3 input. {pull}15252[15252] {issue}15225[15225]
- Fix session reset detection and a crash in Netflow input. {pull}14904[14904]
- Handle errors in handleS3Objects function and add more debug messages for s3 input. {pull}15545[15545]
- netflow: Allow for options templates without scope fields. {pull}15449[15449]
- netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449]
- netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449]
Expand Down
28 changes: 16 additions & 12 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (p *s3Input) Wait() {
func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityTimeout int64, svcS3 s3iface.ClientAPI, svcSQS sqsiface.ClientAPI) {
var wg sync.WaitGroup
numMessages := len(messages)
p.logger.Debugf("Processing %v messages", numMessages)
wg.Add(numMessages * 2)

// process messages received from sqs
Expand All @@ -251,14 +252,16 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w
p.logger.Error(errors.Wrap(err, "handleSQSMessage failed"))
return
}
p.logger.Debugf("handleSQSMessage succeed and returned %v sets of S3 log info", len(s3Infos))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want all these debug messages in? Other than that, LGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, right now we don't have enough to check where the code went wrong 😂 These messages won't show up unless DEBUG logging is enabled.


// read from s3 object and create event for each log line
err = p.handleS3Objects(svcS3, s3Infos, errC)
if err != nil {
err = errors.Wrap(err, "handleS3Objects failed")
p.logger.Error(err)
errC <- err
return
}
p.logger.Debugf("handleS3Objects succeed")
}

func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Message, queueURL string, visibilityTimeout int64, wg *sync.WaitGroup, errC chan error) {
Expand Down Expand Up @@ -288,13 +291,14 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess
}
return
case <-time.After(time.Duration(visibilityTimeout/2) * time.Second):
p.logger.Warn("Half of the set visibilityTimeout passed, visibility timeout needs to be updated")
// If half of the set visibilityTimeout passed and this is
// still ongoing, then change visibility timeout.
err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle)
if err != nil {
p.logger.Error(errors.Wrap(err, "change message visibility failed"))
}
p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout)
p.logger.Infof("Message visibility timeout updated to %v seconds", visibilityTimeout)
}
}
}
Expand Down Expand Up @@ -370,8 +374,11 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
// read from s3 object
reader, err := p.newS3BucketReader(svc, s3Info)
if err != nil {
return errors.Wrap(err, "newS3BucketReader failed")
err = errors.Wrap(err, "newS3BucketReader failed")
s3Context.setError(err)
return err
}

if reader == nil {
continue
}
Expand All @@ -382,7 +389,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context)
if err != nil {
err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key)
s3Context.Fail(err)
s3Context.setError(err)
return err
}
return nil
Expand All @@ -403,12 +410,14 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key)
s3Context.Fail(err)
s3Context.setError(err)
return err
}
return nil
} else if err != nil {
return errors.Wrapf(err, "ReadString failed for %v", s3Info.key)
err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key)
s3Context.setError(err)
return err
}

// create event per log line
Expand All @@ -417,7 +426,7 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC
err = p.forwardEvent(event)
if err != nil {
err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key)
s3Context.Fail(err)
s3Context.setError(err)
return err
}
}
Expand Down Expand Up @@ -610,11 +619,6 @@ func s3ObjectHash(s3Info s3Info) string {
return prefix[:10]
}

func (c *s3Context) Fail(err error) {
c.setError(err)
c.done()
}

func (c *s3Context) setError(err error) {
// only care about the last error for now
// TODO: add "Typed" error to error for context
Expand Down