Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.14](backport #39131) Fix concurrency bugs that could cause data loss in the aws-s3 input #39262

Merged
merged 2 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/smithy-go"
Expand All @@ -21,7 +22,6 @@
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/statestore"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/go-concert/unison"
Expand Down Expand Up @@ -99,6 +99,7 @@
}

func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
<<<<<<< HEAD

Check failure on line 102 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
var err error

persistentStore, err := in.store.Access()
Expand All @@ -125,6 +126,9 @@
}
}()
defer cancelInputCtx()
=======

Check failure on line 129 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '==' (typecheck)
ctx := v2.GoContextFromCanceler(inputContext.Cancelation)
>>>>>>> e588628b24 (Fix concurrency bugs that could cause data loss in the `aws-s3` input (#39131))

Check failure on line 131 in x-pack/filebeat/input/awss3/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName)
Expand Down Expand Up @@ -168,8 +172,20 @@
}
defer client.Close()

// Connect to the registry and create our states lookup
persistentStore, err := in.store.Access()
if err != nil {
return fmt.Errorf("can not access persistent store: %w", err)
}
defer persistentStore.Close()

states, err := newStates(inputContext, persistentStore)
if err != nil {
return fmt.Errorf("can not start persistent store: %w", err)
}

// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states)
poller, err := in.createS3Lister(inputContext, ctx, client, states)
if err != nil {
return fmt.Errorf("failed to initialize s3 poller: %w", err)
}
Expand Down Expand Up @@ -240,7 +256,7 @@
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}

func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) {
func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, states *states) (*s3Poller, error) {
var bucketName string
var bucketID string
if in.config.NonAWSBucketName != "" {
Expand All @@ -260,6 +276,12 @@
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = in.config.PathStyle

o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) {
so.MaxAttempts = 5
// Recover quickly when requests start working again
so.NoRetryIncrement = 100
})
})
regionName, err := getRegionForBucket(cancelCtx, s3Client, bucketName)
if err != nil {
Expand Down Expand Up @@ -305,7 +327,6 @@
client,
s3EventHandlerFactory,
states,
persistentStore,
bucketID,
in.config.BucketListPrefix,
in.awsConfig.Region,
Expand Down
14 changes: 6 additions & 8 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"

Expand Down Expand Up @@ -132,7 +133,7 @@ type constantS3 struct {
var _ s3API = (*constantS3)(nil)

func newConstantS3(t testing.TB) *constantS3 {
data, err := ioutil.ReadFile(cloudtrailTestFile)
data, err := os.ReadFile(cloudtrailTestFile)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -342,14 +343,11 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
return
}

err = store.Set(awsS3WriteCommitPrefix+"bucket"+listPrefix, &commitWriteState{time.Time{}})
if err != nil {
errChan <- err
return
}
states, err := newStates(inputCtx, store)
assert.NoError(t, err, "states creation should succeed")

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}, numberOfWorkers)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, states, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)

if err := s3Poller.Poll(ctx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
Expand Down
Loading
Loading