From a0ef4d3af0388562674684e3fba3cb56008a5fbb Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 16 Dec 2021 17:21:09 -0500 Subject: [PATCH] Stop calling ChangeMessageVisibility after ReceiptHandleIsInvalid (#29480) Stop the keepalive goroutine after ErrCodeReceiptHandleIsInvalid is returned by ChangeMessageVisibility. Add `message_receipt_time` to log messages associated with processing of a given SQS message. Fix incorrect error being wrapped when ApproximateReceiveCount threshold is reached. --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/awss3/sqs_s3_event.go | 18 +++++++++-- .../filebeat/input/awss3/sqs_s3_event_test.go | 32 +++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4a69cc01758..248e1618093 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `threatintel.misp` filters configuration. {issue}27970[27970] - Fix handling of escaped newlines in the `decode_cef` processor. {issue}16995[16995] {pull}29268[29268] - Fix `panw` module ingest errors for GLOBALPROTECT logs {pull}29154[29154] +- aws-s3: Stop trying to increase SQS message visibility after ReceiptHandleIsInvalid errors. {pull}29480[29480] - Fix handling of IPv6 addresses in netflow flow events. {issue}19210[19210] {pull}29383[29383] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index b6641a36c81..d1865aec9cd 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -14,6 +14,7 @@ import ( "sync" "time" + "github.com/aws/aws-sdk-go-v2/aws/awserr" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/pkg/errors" "go.uber.org/multierr" @@ -105,7 +106,9 @@ func newSQSS3EventProcessor(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, } func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *sqs.Message) error { - log := p.log.With("message_id", *msg.MessageId) + log := p.log.With( + "message_id", *msg.MessageId, + "message_receipt_time", time.Now().UTC()) keepaliveCtx, keepaliveCancel := context.WithCancel(ctx) defer keepaliveCancel() @@ -137,7 +140,7 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *sqs.Message) if receiveCount, err := strconv.Atoi(v); err == nil && receiveCount >= p.maxReceiveCount { processingErr = nonRetryableErrorWrap(fmt.Errorf( "sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w", - receiveCount, p.maxReceiveCount, err)) + receiveCount, p.maxReceiveCount, processingErr)) } } } @@ -180,6 +183,17 @@ func (p *sqsS3EventProcessor) keepalive(ctx context.Context, log *logp.Logger, w // Renew visibility. if err := p.sqs.ChangeMessageVisibility(ctx, msg, p.sqsVisibilityTimeout); err != nil { + var awsErr awserr.Error + if errors.As(err, &awsErr) { + switch awsErr.Code() { + case sqs.ErrCodeReceiptHandleIsInvalid: + log.Warnw("Failed to extend message visibility timeout "+ + "because SQS receipt handle is no longer valid. "+ + "Stopping SQS message keepalive routine.", "error", err) + return + } + } + log.Warnw("Failed to extend message visibility timeout.", "error", err) } } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index ad6d30056d4..6100dbe3119 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -8,9 +8,12 @@ import ( "context" "errors" "fmt" + "sync" "testing" "time" + "github.com/aws/aws-sdk-go-v2/aws/awserr" + "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -161,6 +164,35 @@ func TestSQSS3EventProcessor(t *testing.T) { }) } +func TestSqsProcessor_keepalive(t *testing.T) { + msg := newSQSMessage(newS3Event("log.json")) + + // Test will call ChangeMessageVisibility once and then keepalive will + // exit because the SQS receipt handle is not usable. + t.Run("keepalive stops after receipt handle is invalid", func(t *testing.T) { + const visibilityTimeout = time.Second + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) + + receiptHandleErr := awserr.New(sqs.ErrCodeReceiptHandleIsInvalid, "fake receipt handle is invalid.", nil) + + mockAPI.EXPECT().ChangeMessageVisibility(gomock.Any(), gomock.Eq(&msg), gomock.Eq(visibilityTimeout)). + Times(1).Return(receiptHandleErr) + + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockS3HandlerFactory) + var wg sync.WaitGroup + wg.Add(1) + p.keepalive(ctx, p.log, &wg, &msg) + wg.Wait() + }) +} + func TestSqsProcessor_getS3Notifications(t *testing.T) { logp.TestingSetup()