Skip to content

Commit

Permalink
indexshipper/storage: fix race conditions (#10314)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
There were many race conditions reported if you run `go test -race` on
this package.
Google's 'singleflight' package is much neater and makes all the race
warnings go away.

**Which issue(s) this PR fixes**:
Relates to #8586

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- NA Documentation added
- NA Tests updated
- [x] `CHANGELOG.md` updated
- NA If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- NA Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- NA For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
  • Loading branch information
bboreham authored Aug 30, 2023
1 parent f58ef70 commit a32cbeb
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
* [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`.
* [9949](https://github.com/grafana/loki/pull/9949) **masslessparticle**: Fix pipelines to clear caches when tailing to avoid resource exhaustion.
* [9936](https://github.com/grafana/loki/pull/9936) **masslessparticle**: Fix the way query stages are reordered when `unpack` is present.
* [10314](https://github.com/grafana/loki/pull/10314) **bboreham**: Fix race conditions in indexshipper.
* [10309](https://github.com/grafana/loki/pull/10309) **akhilanarayanan**: Fix race condition in series index store.

##### Changes
Expand Down
137 changes: 55 additions & 82 deletions pkg/storage/stores/indexshipper/storage/cached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/log/level"
"golang.org/x/sync/singleflight"

"github.com/grafana/loki/pkg/storage/chunk/client"
util_log "github.com/grafana/loki/pkg/util/log"
Expand All @@ -17,6 +18,7 @@ import (

const (
cacheTimeout = 1 * time.Minute
refreshKey = "refresh"
)

type table struct {
Expand All @@ -26,19 +28,16 @@ type table struct {
userIDs []client.StorageCommonPrefix
userObjects map[string][]client.StorageObject

cacheBuiltAt time.Time
buildCacheChan chan struct{}
buildCacheWg sync.WaitGroup
err error
cacheBuiltAt time.Time
buildCacheGroup singleflight.Group
}

func newTable(tableName string) *table {
return &table{
name: tableName,
buildCacheChan: make(chan struct{}, 1),
userIDs: []client.StorageCommonPrefix{},
userObjects: map[string][]client.StorageObject{},
commonObjects: []client.StorageObject{},
name: tableName,
userIDs: []client.StorageCommonPrefix{},
userObjects: map[string][]client.StorageObject{},
commonObjects: []client.StorageObject{},
}
}

Expand All @@ -50,43 +49,20 @@ type cachedObjectClient struct {
tablesMtx sync.RWMutex
tableNamesCacheBuiltAt time.Time

buildTableNamesCacheChan chan struct{}
buildTableNamesCacheWg sync.WaitGroup
err error
buildCacheGroup singleflight.Group
}

func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectClient {
return &cachedObjectClient{
ObjectClient: downstreamClient,
tables: map[string]*table{},
buildTableNamesCacheChan: make(chan struct{}, 1),
}
}

// buildCacheOnce makes sure we build the cache just once when it is called concurrently.
// We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through.
// We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since
// we are doing read-through cache, and we do not want to serve stale results.
func buildCacheOnce(buildCacheWg *sync.WaitGroup, buildCacheChan chan struct{}, buildCacheFunc func()) {
buildCacheWg.Add(1)
defer buildCacheWg.Done()

// when the cache is expired, only one concurrent call must be able to rebuild it
// all other calls will wait until the cache is built successfully or failed with an error
select {
case buildCacheChan <- struct{}{}:
buildCacheFunc()
<-buildCacheChan
default:
ObjectClient: downstreamClient,
tables: map[string]*table{},
}
}

func (c *cachedObjectClient) RefreshIndexTableNamesCache(ctx context.Context) {
buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() {
c.err = nil
c.err = c.buildTableNamesCache(ctx, true)
_, _, _ = c.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
return nil, c.buildTableNamesCache(ctx)
})
c.buildTableNamesCacheWg.Wait()
}

func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableName string) {
Expand All @@ -103,20 +79,23 @@ func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableNa
}
}

buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
tbl.err = nil
tbl.err = tbl.buildCache(ctx, c.ObjectClient, true)
_, _, _ = tbl.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
err := tbl.buildCache(ctx, c.ObjectClient)
return nil, err
})
tbl.buildCacheWg.Wait()
}

func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
if bypassCache {
return c.ObjectClient.List(ctx, prefix, objectDelimiter)
}

c.tablesMtx.RLock()
neverBuiltCache := c.tableNamesCacheBuiltAt.IsZero()
c.tablesMtx.RUnlock()

// if we have never built table names cache, let us build it first.
if c.tableNamesCacheBuiltAt.IsZero() {
if neverBuiltCache {
c.RefreshIndexTableNamesCache(ctx)
}

Expand Down Expand Up @@ -147,18 +126,9 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s
}

func (c *cachedObjectClient) listTableNames(ctx context.Context) ([]client.StorageCommonPrefix, error) {
if time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout {
buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() {
c.err = nil
c.err = c.buildTableNamesCache(ctx, false)
})
}

// wait for cache build operation to finish, if running
c.buildTableNamesCacheWg.Wait()

if c.err != nil {
return nil, c.err
err := c.updateTableNamesCache(ctx)
if err != nil {
return nil, err
}

c.tablesMtx.RLock()
Expand All @@ -173,18 +143,9 @@ func (c *cachedObjectClient) listTable(ctx context.Context, tableName string) ([
return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil
}

if time.Since(tbl.cacheBuiltAt) >= cacheTimeout {
buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
tbl.err = nil
tbl.err = tbl.buildCache(ctx, c.ObjectClient, false)
})
}

// wait for cache build operation to finish, if running
tbl.buildCacheWg.Wait()

if tbl.err != nil {
return nil, nil, tbl.err
err := tbl.updateCache(ctx, c.ObjectClient)
if err != nil {
return nil, nil, err
}

tbl.mtx.RLock()
Expand All @@ -199,18 +160,9 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName
return []client.StorageObject{}, nil
}

if time.Since(tbl.cacheBuiltAt) >= cacheTimeout {
buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() {
tbl.err = nil
tbl.err = tbl.buildCache(ctx, c.ObjectClient, false)
})
}

// wait for cache build operation to finish, if running
tbl.buildCacheWg.Wait()

if tbl.err != nil {
return nil, tbl.err
err := tbl.updateCache(ctx, c.ObjectClient)
if err != nil {
return nil, err
}

tbl.mtx.RLock()
Expand All @@ -223,11 +175,21 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName
return []client.StorageObject{}, nil
}

func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context, forceRefresh bool) (err error) {
if !forceRefresh && time.Since(c.tableNamesCacheBuiltAt) < cacheTimeout {
// Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time.
func (c *cachedObjectClient) updateTableNamesCache(ctx context.Context) error {
c.tablesMtx.RLock()
outOfDate := time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout
c.tablesMtx.RUnlock()
if !outOfDate {
return nil
}
_, err, _ := c.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
return nil, c.buildTableNamesCache(ctx)
})
return err
}

func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context) (err error) {
defer func() {
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to build table names cache", "err", err)
Expand Down Expand Up @@ -282,11 +244,22 @@ func (c *cachedObjectClient) getTable(tableName string) *table {
return c.tables[tableName]
}

func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient, forceRefresh bool) (err error) {
if !forceRefresh && time.Since(t.cacheBuiltAt) < cacheTimeout {
// Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time.
func (t *table) updateCache(ctx context.Context, objectClient client.ObjectClient) error {
t.mtx.RLock()
outOfDate := time.Since(t.cacheBuiltAt) >= cacheTimeout
t.mtx.RUnlock()
if !outOfDate {
return nil
}
_, err, _ := t.buildCacheGroup.Do(refreshKey, func() (interface{}, error) {
err := t.buildCache(ctx, objectClient)
return nil, err
})
return err
}

func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient) (err error) {
defer func() {
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to build table cache", "table_name", t.name, "err", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestCachedObjectClient_errors(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), tc.prefix, "", false)
objects, commonPrefixes, err := cachedObjectClient.List(context.Background(), tc.prefix, "", false)
require.NoError(t, err)
require.Equal(t, expectedListCallsCount, objectClient.listCallsCount)
require.Equal(t, tc.expectedObjects, objects)
Expand Down
Loading

0 comments on commit a32cbeb

Please sign in to comment.