Skip to content

Commit

Permalink
Stop calling ChangeMessageVisibility after ReceiptHandleIsInvalid (el…
Browse files Browse the repository at this point in the history
…astic#29480)

Stop the keepalive goroutine after ErrCodeReceiptHandleIsInvalid is returned by ChangeMessageVisibility.

Add `message_receipt_time` to log messages associated with processing of a given SQS message.

Fix incorrect error being wrapped when ApproximateReceiveCount threshold is reached.
  • Loading branch information
andrewkroh authored Dec 16, 2021
1 parent 8921ea4 commit a0ef4d3
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `threatintel.misp` filters configuration. {issue}27970[27970]
- Fix handling of escaped newlines in the `decode_cef` processor. {issue}16995[16995] {pull}29268[29268]
- Fix `panw` module ingest errors for GLOBALPROTECT logs {pull}29154[29154]
- aws-s3: Stop trying to increase SQS message visibility after ReceiptHandleIsInvalid errors. {pull}29480[29480]
- Fix handling of IPv6 addresses in netflow flow events. {issue}19210[19210] {pull}29383[29383]

*Heartbeat*
Expand Down
18 changes: 16 additions & 2 deletions x-pack/filebeat/input/awss3/sqs_s3_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws/awserr"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/pkg/errors"
"go.uber.org/multierr"
Expand Down Expand Up @@ -105,7 +106,9 @@ func newSQSS3EventProcessor(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI,
}

func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *sqs.Message) error {
log := p.log.With("message_id", *msg.MessageId)
log := p.log.With(
"message_id", *msg.MessageId,
"message_receipt_time", time.Now().UTC())

keepaliveCtx, keepaliveCancel := context.WithCancel(ctx)
defer keepaliveCancel()
Expand Down Expand Up @@ -137,7 +140,7 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *sqs.Message)
if receiveCount, err := strconv.Atoi(v); err == nil && receiveCount >= p.maxReceiveCount {
processingErr = nonRetryableErrorWrap(fmt.Errorf(
"sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w",
receiveCount, p.maxReceiveCount, err))
receiveCount, p.maxReceiveCount, processingErr))
}
}
}
Expand Down Expand Up @@ -180,6 +183,17 @@ func (p *sqsS3EventProcessor) keepalive(ctx context.Context, log *logp.Logger, w

// Renew visibility.
if err := p.sqs.ChangeMessageVisibility(ctx, msg, p.sqsVisibilityTimeout); err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
switch awsErr.Code() {
case sqs.ErrCodeReceiptHandleIsInvalid:
log.Warnw("Failed to extend message visibility timeout "+
"because SQS receipt handle is no longer valid. "+
"Stopping SQS message keepalive routine.", "error", err)
return
}
}

log.Warnw("Failed to extend message visibility timeout.", "error", err)
}
}
Expand Down
32 changes: 32 additions & 0 deletions x-pack/filebeat/input/awss3/sqs_s3_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws/awserr"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -161,6 +164,35 @@ func TestSQSS3EventProcessor(t *testing.T) {
})
}

func TestSqsProcessor_keepalive(t *testing.T) {
msg := newSQSMessage(newS3Event("log.json"))

// Test will call ChangeMessageVisibility once and then keepalive will
// exit because the SQS receipt handle is not usable.
t.Run("keepalive stops after receipt handle is invalid", func(t *testing.T) {
const visibilityTimeout = time.Second

ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()
mockAPI := NewMockSQSAPI(ctrl)
mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl)

receiptHandleErr := awserr.New(sqs.ErrCodeReceiptHandleIsInvalid, "fake receipt handle is invalid.", nil)

mockAPI.EXPECT().ChangeMessageVisibility(gomock.Any(), gomock.Eq(&msg), gomock.Eq(visibilityTimeout)).
Times(1).Return(receiptHandleErr)

p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockS3HandlerFactory)
var wg sync.WaitGroup
wg.Add(1)
p.keepalive(ctx, p.log, &wg, &msg)
wg.Wait()
})
}

func TestSqsProcessor_getS3Notifications(t *testing.T) {
logp.TestingSetup()

Expand Down

0 comments on commit a0ef4d3

Please sign in to comment.