Skip to content

Commit

Permalink
indexshipper/compactor: fix two race conditions (#10313)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
We must call wg.Add before starting the goroutine.
And protect the data in mockCompactorClient.
And another fix in shipper/index/compactor.

**Which issue(s) this PR fixes**:
Part of #8586

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- NA Documentation added
- NA Tests updated
- NA `CHANGELOG.md` updated
- NA If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- NA Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- NA For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
  • Loading branch information
bboreham authored Aug 22, 2023
1 parent 43e542e commit ecd371d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package deletion

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -46,11 +47,11 @@ func TestGetCacheGenNumberForUser(t *testing.T) {
require.Nil(t, err)
require.Equal(t, "test-request", deleteRequests[0].RequestID)

compactorClient.delRequests = []DeleteRequest{
compactorClient.SetDeleteRequests([]DeleteRequest{
{
RequestID: "different",
},
}
})

deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID")
require.Nil(t, err)
Expand All @@ -67,11 +68,20 @@ func TestGetCacheGenNumberForUser(t *testing.T) {
}

type mockCompactorClient struct {
mx sync.Mutex
delRequests []DeleteRequest
cacheGenNum string
}

func (m *mockCompactorClient) SetDeleteRequests(d []DeleteRequest) {
m.mx.Lock()
m.delRequests = d
m.mx.Unlock()
}

func (m *mockCompactorClient) GetAllDeleteRequestsForUser(_ context.Context, _ string) ([]DeleteRequest, error) {
m.mx.Lock()
defer m.mx.Unlock()
return m.delRequests, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func newDeleteRequestsTable(workingDirectory string, indexStorageClient storage.
return nil, err
}

table.wg.Add(1)
go table.loop()
return table, nil
}
Expand Down Expand Up @@ -82,7 +83,6 @@ func (t *deleteRequestsTable) loop() {
uploadTicker := time.NewTicker(5 * time.Minute)
defer uploadTicker.Stop()

t.wg.Add(1)
defer t.wg.Done()

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (t *tableCompactor) compactCommonIndexes(ctx context.Context) (*CompactedIn
// not locking the mutex here since there should be no writers at this point
downloadedDB := dbsToRead[workNum]

err = readFile(idxSet.GetLogger(), downloadedDB, func(bucketName string, batch []indexEntry) error {
return readFile(idxSet.GetLogger(), downloadedDB, func(bucketName string, batch []indexEntry) error {
indexFile := compactedFile
if bucketName != shipper_util.GetUnsafeString(local.IndexBucketName) {
t.userCompactedIndexSetMtx.RLock()
Expand All @@ -435,7 +435,6 @@ func (t *tableCompactor) compactCommonIndexes(ctx context.Context) (*CompactedIn

return writeBatch(indexFile, batch)
})
return nil
})

if err != nil {
Expand Down

0 comments on commit ecd371d

Please sign in to comment.