diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go index 799b7f1e7369d..2ba95b568f070 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go @@ -2,6 +2,7 @@ package deletion import ( "context" + "sync" "testing" "time" @@ -46,11 +47,11 @@ func TestGetCacheGenNumberForUser(t *testing.T) { require.Nil(t, err) require.Equal(t, "test-request", deleteRequests[0].RequestID) - compactorClient.delRequests = []DeleteRequest{ + compactorClient.SetDeleteRequests([]DeleteRequest{ { RequestID: "different", }, - } + }) deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID") require.Nil(t, err) @@ -67,11 +68,20 @@ func TestGetCacheGenNumberForUser(t *testing.T) { } type mockCompactorClient struct { + mx sync.Mutex delRequests []DeleteRequest cacheGenNum string } +func (m *mockCompactorClient) SetDeleteRequests(d []DeleteRequest) { + m.mx.Lock() + m.delRequests = d + m.mx.Unlock() +} + func (m *mockCompactorClient) GetAllDeleteRequestsForUser(_ context.Context, _ string) ([]DeleteRequest, error) { + m.mx.Lock() + defer m.mx.Unlock() return m.delRequests, nil } diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table.go index efe7f695c1ec7..ae535e31ff452 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_table.go @@ -52,6 +52,7 @@ func newDeleteRequestsTable(workingDirectory string, indexStorageClient storage. return nil, err } + table.wg.Add(1) go table.loop() return table, nil } @@ -82,7 +83,6 @@ func (t *deleteRequestsTable) loop() { uploadTicker := time.NewTicker(5 * time.Minute) defer uploadTicker.Stop() - t.wg.Add(1) defer t.wg.Done() for { diff --git a/pkg/storage/stores/shipper/index/compactor/table_compactor.go b/pkg/storage/stores/shipper/index/compactor/table_compactor.go index b03985a0977ce..6f6c20daf0faf 100644 --- a/pkg/storage/stores/shipper/index/compactor/table_compactor.go +++ b/pkg/storage/stores/shipper/index/compactor/table_compactor.go @@ -420,7 +420,7 @@ func (t *tableCompactor) compactCommonIndexes(ctx context.Context) (*CompactedIn // not locking the mutex here since there should be no writers at this point downloadedDB := dbsToRead[workNum] - err = readFile(idxSet.GetLogger(), downloadedDB, func(bucketName string, batch []indexEntry) error { + return readFile(idxSet.GetLogger(), downloadedDB, func(bucketName string, batch []indexEntry) error { indexFile := compactedFile if bucketName != shipper_util.GetUnsafeString(local.IndexBucketName) { t.userCompactedIndexSetMtx.RLock() @@ -435,7 +435,6 @@ func (t *tableCompactor) compactCommonIndexes(ctx context.Context) (*CompactedIn return writeBatch(indexFile, batch) }) - return nil }) if err != nil {