From 583d3451cde4e809a8b52d94620eac248199d74b Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 6 Dec 2024 08:37:24 -0800 Subject: [PATCH] registry cleanup for unavailable objects (#41694) Signed-off-by: Kavindu Dodanduwa # Conflicts: # x-pack/filebeat/input/awss3/states.go # x-pack/filebeat/input/awss3/states_test.go --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/awss3/s3_input.go | 24 +++++- x-pack/filebeat/input/awss3/states.go | 47 +++++++++-- x-pack/filebeat/input/awss3/states_test.go | 96 +++++++++++++++++++--- 4 files changed, 145 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a846236e934..17aca375832 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -357,6 +357,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add evaluation state dump debugging option to CEL input. {pull}41335[41335] - Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] - Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869] +- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694] *Auditbeat* diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index bc8a21495e5..88f28e39e83 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -115,8 +115,19 @@ func (in *s3PollerInput) runPoll(ctx context.Context) { } // Start reading data and wait for its processing to be done - in.readerLoop(ctx, workChan) + ids, ok := in.readerLoop(ctx, workChan) workerWg.Wait() + + if !ok { + in.log.Warn("skipping state registry cleanup as object reading ended with a non-ok return") + return + } + + // Perform state cleanup operation + err := in.states.CleanUp(ids) + if err != nil { + in.log.Errorf("failed to cleanup states: %v", err.Error()) + } } func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state) { @@ -183,7 +194,10 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state) } } -func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) { +// readerLoop performs the S3 object listing and emit state to work listeners if object needs to be processed. +// Returns all tracked state IDs correlates to all tracked S3 objects iff listing is successful. +// These IDs are intended to be used for state clean-up. +func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) (knownStateIDSlice []string, ok bool) { defer close(workChan) bucketName := getBucketNameFromARN(in.config.getBucketARN()) @@ -202,7 +216,7 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) circuitBreaker++ if circuitBreaker >= readerLoopMaxCircuitBreaker { in.log.Warnw(fmt.Sprintf("%d consecutive error when paginating listing, breaking the circuit.", circuitBreaker), "error", err) - break + return nil, false } } // add a backoff delay and try again @@ -219,6 +233,8 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) in.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects)) for _, object := range page.Contents { state := newState(bucketName, *object.Key, *object.ETag, *object.LastModified) + knownStateIDSlice = append(knownStateIDSlice, state.ID()) + if in.states.IsProcessed(state) { in.log.Debugw("skipping state.", "state", state) continue @@ -229,6 +245,8 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) in.metrics.s3ObjectsProcessedTotal.Inc() } } + + return knownStateIDSlice, true } func (in *s3PollerInput) s3EventForState(state state) s3EventV2 { diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index 8aef3de1c99..2bfb9f29cd8 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -21,7 +21,7 @@ const awsS3ObjectStatePrefix = "filebeat::aws-s3::state::" type states struct { // Completed S3 object states, indexed by state ID. // statesLock must be held to access states. - states map[string]state + states map[string]*state statesLock sync.Mutex // The store used to persist state changes to the registry. @@ -70,29 +70,61 @@ func (s *states) AddState(state state) error { id := state.ID() // Update in-memory copy s.statesLock.Lock() - s.states[id] = state + s.states[id] = &state s.statesLock.Unlock() // Persist to the registry s.storeLock.Lock() defer s.storeLock.Unlock() - key := awsS3ObjectStatePrefix + id - if err := s.store.Set(key, state); err != nil { + if err := s.store.Set(getStoreKey(id), state); err != nil { return err } return nil } +// CleanUp performs state and store cleanup based on provided knownIDs. +// knownIDs must contain valid currently tracked state IDs that must be known by this state registry. +// State and underlying storage will be cleaned if ID is no longer present in knownIDs set. +func (s *states) CleanUp(knownIDs []string) error { + knownIDHashSet := map[string]struct{}{} + for _, id := range knownIDs { + knownIDHashSet[id] = struct{}{} + } + + s.storeLock.Lock() + defer s.storeLock.Unlock() + s.statesLock.Lock() + defer s.statesLock.Unlock() + + for id := range s.states { + if _, contains := knownIDHashSet[id]; !contains { + // remove from sate & store as ID is no longer seen in known ID set + delete(s.states, id) + err := s.store.Remove(getStoreKey(id)) + if err != nil { + return fmt.Errorf("error while removing the state for ID %s: %w", id, err) + } + } + } + + return nil +} + func (s *states) Close() { s.storeLock.Lock() s.store.Close() s.storeLock.Unlock() } +// getStoreKey is a helper to generate the key used by underlying persistent storage +func getStoreKey(stateID string) string { + return awsS3ObjectStatePrefix + stateID +} + // loadS3StatesFromRegistry loads a copy of the registry states. // If prefix is set, entries will match the provided prefix(including empty prefix) -func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]state, error) { - stateTable := map[string]state{} +func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix string) (map[string]*state, error) { + stateTable := map[string]*state{} err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { if !strings.HasPrefix(key, awsS3ObjectStatePrefix) { return true, nil @@ -117,9 +149,8 @@ func loadS3StatesFromRegistry(log *logp.Logger, store *statestore.Store, prefix // filter based on prefix and add entry to local copy if strings.HasPrefix(st.Key, prefix) { - stateTable[st.ID()] = st + stateTable[st.ID()] = &st } - return true, nil }) if err != nil { diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index 328b57003cd..fa604ed08d9 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -5,6 +5,7 @@ package awss3 import ( + "fmt" "testing" "time" @@ -42,7 +43,7 @@ func (s *testInputStore) CleanupInterval() time.Duration { func TestStatesAddStateAndIsProcessed(t *testing.T) { type stateTestCase struct { // An initialization callback to invoke on the (initially empty) states. - statesEdit func(states *states) + statesEdit func(states *states) error // The state to call IsProcessed on and the expected result state state @@ -62,42 +63,42 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { expectedIsProcessed: false, }, "not existing state": { - statesEdit: func(states *states) { - _ = states.AddState(testState2) + statesEdit: func(states *states) error { + return states.AddState(testState2) }, state: testState1, expectedIsProcessed: false, }, "existing state": { - statesEdit: func(states *states) { - _ = states.AddState(testState1) + statesEdit: func(states *states) error { + return states.AddState(testState1) }, state: testState1, expectedIsProcessed: true, }, "existing stored state is persisted": { - statesEdit: func(states *states) { + statesEdit: func(states *states) error { state := testState1 state.Stored = true - _ = states.AddState(state) + return states.AddState(state) }, state: testState1, shouldReload: true, expectedIsProcessed: true, }, "existing failed state is persisted": { - statesEdit: func(states *states) { + statesEdit: func(states *states) error { state := testState1 state.Failed = true - _ = states.AddState(state) + return states.AddState(state) }, state: testState1, shouldReload: true, expectedIsProcessed: true, }, "existing unprocessed state is not persisted": { - statesEdit: func(states *states) { - _ = states.AddState(testState1) + statesEdit: func(states *states) error { + return states.AddState(testState1) }, state: testState1, shouldReload: true, @@ -112,7 +113,8 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { states, err := newStates(nil, store, "") require.NoError(t, err, "states creation must succeed") if test.statesEdit != nil { - test.statesEdit(states) + err = test.statesEdit(states) + require.NoError(t, err, "states edit must succeed") } if test.shouldReload { states, err = newStates(nil, store, "") @@ -125,6 +127,76 @@ func TestStatesAddStateAndIsProcessed(t *testing.T) { } } +func TestStatesCleanUp(t *testing.T) { + bucketName := "test-bucket" + lModifiedTime := time.Unix(0, 0) + stateA := newState(bucketName, "a", "a-etag", lModifiedTime) + stateB := newState(bucketName, "b", "b-etag", lModifiedTime) + stateC := newState(bucketName, "c", "c-etag", lModifiedTime) + + tests := []struct { + name string + initStates []state + knownIDs []string + expectIDs []string + }{ + { + name: "No cleanup if not missing from known list", + initStates: []state{stateA, stateB, stateC}, + knownIDs: []string{stateA.ID(), stateB.ID(), stateC.ID()}, + expectIDs: []string{stateA.ID(), stateB.ID(), stateC.ID()}, + }, + { + name: "Clean up if missing from known list", + initStates: []state{stateA, stateB, stateC}, + knownIDs: []string{stateA.ID()}, + expectIDs: []string{stateA.ID()}, + }, + { + name: "Clean up everything", + initStates: []state{stateA, stateC}, // given A, C + knownIDs: []string{stateB.ID()}, // but known B + expectIDs: []string{}, // empty state & store + }, + { + name: "Empty known IDs are valid", + initStates: []state{stateA}, // given A + knownIDs: []string{}, // Known nothing + expectIDs: []string{}, // empty state & store + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + store := openTestStatestore() + statesInstance, err := newStates(nil, store, "") + require.NoError(t, err, "states creation must succeed") + + for _, s := range test.initStates { + err := statesInstance.AddState(s) + require.NoError(t, err, "state initialization must succeed") + } + + // perform cleanup + err = statesInstance.CleanUp(test.knownIDs) + require.NoError(t, err, "state cleanup must succeed") + + // validate + for _, id := range test.expectIDs { + // must be in local state + _, ok := statesInstance.states[id] + require.True(t, ok, fmt.Errorf("expected id %s in state, but got missing", id)) + + // must be in store + ok, err := statesInstance.store.Has(getStoreKey(id)) + require.NoError(t, err, "state has must succeed") + require.True(t, ok, fmt.Errorf("expected id %s in store, but got missing", id)) + } + }) + } + +} + func TestStatesPrefixHandling(t *testing.T) { logger := logp.NewLogger("state-prefix-testing")