From 1f701b22f7fda77d99d7ca0a12215500ca42a95d Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 13 Oct 2020 11:36:12 -0600 Subject: [PATCH] [Filebeat] Add check for context.DeadlineExceeded error (#21732) (cherry picked from commit 7addb4d45502d6f1bc0d5ea9823dc5a926cda9ea) --- x-pack/filebeat/input/s3/collector.go | 34 ++++++++++++--------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/x-pack/filebeat/input/s3/collector.go b/x-pack/filebeat/input/s3/collector.go index 2976dd52a5b..73e12acfeab 100644 --- a/x-pack/filebeat/input/s3/collector.go +++ b/x-pack/filebeat/input/s3/collector.go @@ -148,8 +148,7 @@ func (c *s3Collector) processMessage(svcS3 s3iface.ClientAPI, message sqs.Messag // read from s3 object and create event for each log line err = c.handleS3Objects(svcS3, s3Infos, errC) if err != nil { - err = fmt.Errorf("handleS3Objects failed: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("handleS3Objects failed: %w", err)) return err } c.logger.Debugf("handleS3Objects succeed") @@ -163,7 +162,12 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs. return nil case err := <-errC: if err != nil { - c.logger.Warn("Processing message failed, updating visibility timeout") + if err == context.DeadlineExceeded { + c.logger.Info("Context deadline exceeded, updating visibility timeout") + } else { + c.logger.Warnf("Processing message failed '%w', updating visibility timeout", err) + } + err := c.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { c.logger.Error(fmt.Errorf("SQS ChangeMessageVisibilityRequest failed: %w", err)) @@ -298,8 +302,7 @@ func (c *s3Collector) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, e c.logger.Debugf("Processing file from s3 bucket \"%s\" with name \"%s\"", info.name, info.key) err := c.createEventsFromS3Info(svc, info, s3Ctx) if err != nil { - err = fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err)) s3Ctx.setError(err) } } @@ -326,8 +329,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, // If the SDK can determine the request or retry delay was canceled // by a context the ErrCodeRequestCanceled error will be returned. if awsErr.Code() == awssdk.ErrCodeRequestCanceled { - err = fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) return err } @@ -345,16 +347,14 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, isS3ObjGzipped, err := isStreamGzipped(reader) if err != nil { - err = fmt.Errorf("could not determine if S3 object is gzipped: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err)) return err } if isS3ObjGzipped { gzipReader, err := gzip.NewReader(reader) if err != nil { - err = fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) return err } reader = bufio.NewReader(gzipReader) @@ -366,8 +366,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, decoder := json.NewDecoder(reader) err := c.decodeJSON(decoder, objectHash, info, s3Ctx) if err != nil { - err = fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) return err } return nil @@ -383,14 +382,12 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, event := createEvent(log, offset, info, objectHash, s3Ctx) err = c.forwardEvent(event) if err != nil { - err = fmt.Errorf("forwardEvent failed: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err)) return err } return nil } else if err != nil { - err = fmt.Errorf("readStringAndTrimDelimiter failed: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err)) return err } @@ -403,8 +400,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, event := createEvent(log, offset, info, objectHash, s3Ctx) err = c.forwardEvent(event) if err != nil { - err = fmt.Errorf("forwardEvent failed: %w", err) - c.logger.Error(err) + c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err)) return err } }