diff --git a/CHANGELOG.md b/CHANGELOG.md index d7286078c9006..63e152014d5c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`. * [9949](https://github.com/grafana/loki/pull/9949) **masslessparticle**: Fix pipelines to clear caches when tailing to avoid resource exhaustion. * [9936](https://github.com/grafana/loki/pull/9936) **masslessparticle**: Fix the way query stages are reordered when `unpack` is present. +* [10314](https://github.com/grafana/loki/pull/10314) **bboreham**: Fix race conditions in indexshipper. * [10309](https://github.com/grafana/loki/pull/10309) **akhilanarayanan**: Fix race condition in series index store. ##### Changes diff --git a/pkg/storage/stores/indexshipper/storage/cached_client.go b/pkg/storage/stores/indexshipper/storage/cached_client.go index 5c7c6abf72937..7201bf39a195d 100644 --- a/pkg/storage/stores/indexshipper/storage/cached_client.go +++ b/pkg/storage/stores/indexshipper/storage/cached_client.go @@ -9,6 +9,7 @@ import ( "time" "github.com/go-kit/log/level" + "golang.org/x/sync/singleflight" "github.com/grafana/loki/pkg/storage/chunk/client" util_log "github.com/grafana/loki/pkg/util/log" @@ -17,6 +18,7 @@ import ( const ( cacheTimeout = 1 * time.Minute + refreshKey = "refresh" ) type table struct { @@ -26,19 +28,16 @@ type table struct { userIDs []client.StorageCommonPrefix userObjects map[string][]client.StorageObject - cacheBuiltAt time.Time - buildCacheChan chan struct{} - buildCacheWg sync.WaitGroup - err error + cacheBuiltAt time.Time + buildCacheGroup singleflight.Group } func newTable(tableName string) *table { return &table{ - name: tableName, - buildCacheChan: make(chan struct{}, 1), - userIDs: []client.StorageCommonPrefix{}, - userObjects: map[string][]client.StorageObject{}, - commonObjects: []client.StorageObject{}, + name: tableName, + userIDs: []client.StorageCommonPrefix{}, + userObjects: map[string][]client.StorageObject{}, + commonObjects: []client.StorageObject{}, } } @@ -50,43 +49,20 @@ type cachedObjectClient struct { tablesMtx sync.RWMutex tableNamesCacheBuiltAt time.Time - buildTableNamesCacheChan chan struct{} - buildTableNamesCacheWg sync.WaitGroup - err error + buildCacheGroup singleflight.Group } func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectClient { return &cachedObjectClient{ - ObjectClient: downstreamClient, - tables: map[string]*table{}, - buildTableNamesCacheChan: make(chan struct{}, 1), - } -} - -// buildCacheOnce makes sure we build the cache just once when it is called concurrently. -// We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through. -// We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since -// we are doing read-through cache, and we do not want to serve stale results. -func buildCacheOnce(buildCacheWg *sync.WaitGroup, buildCacheChan chan struct{}, buildCacheFunc func()) { - buildCacheWg.Add(1) - defer buildCacheWg.Done() - - // when the cache is expired, only one concurrent call must be able to rebuild it - // all other calls will wait until the cache is built successfully or failed with an error - select { - case buildCacheChan <- struct{}{}: - buildCacheFunc() - <-buildCacheChan - default: + ObjectClient: downstreamClient, + tables: map[string]*table{}, } } func (c *cachedObjectClient) RefreshIndexTableNamesCache(ctx context.Context) { - buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() { - c.err = nil - c.err = c.buildTableNamesCache(ctx, true) + _, _, _ = c.buildCacheGroup.Do(refreshKey, func() (interface{}, error) { + return nil, c.buildTableNamesCache(ctx) }) - c.buildTableNamesCacheWg.Wait() } func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableName string) { @@ -103,11 +79,10 @@ func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableNa } } - buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { - tbl.err = nil - tbl.err = tbl.buildCache(ctx, c.ObjectClient, true) + _, _, _ = tbl.buildCacheGroup.Do(refreshKey, func() (interface{}, error) { + err := tbl.buildCache(ctx, c.ObjectClient) + return nil, err }) - tbl.buildCacheWg.Wait() } func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) { @@ -115,8 +90,12 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s return c.ObjectClient.List(ctx, prefix, objectDelimiter) } + c.tablesMtx.RLock() + neverBuiltCache := c.tableNamesCacheBuiltAt.IsZero() + c.tablesMtx.RUnlock() + // if we have never built table names cache, let us build it first. - if c.tableNamesCacheBuiltAt.IsZero() { + if neverBuiltCache { c.RefreshIndexTableNamesCache(ctx) } @@ -147,18 +126,9 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s } func (c *cachedObjectClient) listTableNames(ctx context.Context) ([]client.StorageCommonPrefix, error) { - if time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout { - buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() { - c.err = nil - c.err = c.buildTableNamesCache(ctx, false) - }) - } - - // wait for cache build operation to finish, if running - c.buildTableNamesCacheWg.Wait() - - if c.err != nil { - return nil, c.err + err := c.updateTableNamesCache(ctx) + if err != nil { + return nil, err } c.tablesMtx.RLock() @@ -173,18 +143,9 @@ func (c *cachedObjectClient) listTable(ctx context.Context, tableName string) ([ return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil } - if time.Since(tbl.cacheBuiltAt) >= cacheTimeout { - buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { - tbl.err = nil - tbl.err = tbl.buildCache(ctx, c.ObjectClient, false) - }) - } - - // wait for cache build operation to finish, if running - tbl.buildCacheWg.Wait() - - if tbl.err != nil { - return nil, nil, tbl.err + err := tbl.updateCache(ctx, c.ObjectClient) + if err != nil { + return nil, nil, err } tbl.mtx.RLock() @@ -199,18 +160,9 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName return []client.StorageObject{}, nil } - if time.Since(tbl.cacheBuiltAt) >= cacheTimeout { - buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { - tbl.err = nil - tbl.err = tbl.buildCache(ctx, c.ObjectClient, false) - }) - } - - // wait for cache build operation to finish, if running - tbl.buildCacheWg.Wait() - - if tbl.err != nil { - return nil, tbl.err + err := tbl.updateCache(ctx, c.ObjectClient) + if err != nil { + return nil, err } tbl.mtx.RLock() @@ -223,11 +175,21 @@ func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName return []client.StorageObject{}, nil } -func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context, forceRefresh bool) (err error) { - if !forceRefresh && time.Since(c.tableNamesCacheBuiltAt) < cacheTimeout { +// Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time. +func (c *cachedObjectClient) updateTableNamesCache(ctx context.Context) error { + c.tablesMtx.RLock() + outOfDate := time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout + c.tablesMtx.RUnlock() + if !outOfDate { return nil } + _, err, _ := c.buildCacheGroup.Do(refreshKey, func() (interface{}, error) { + return nil, c.buildTableNamesCache(ctx) + }) + return err +} +func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context) (err error) { defer func() { if err != nil { level.Error(util_log.Logger).Log("msg", "failed to build table names cache", "err", err) @@ -282,11 +244,22 @@ func (c *cachedObjectClient) getTable(tableName string) *table { return c.tables[tableName] } -func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient, forceRefresh bool) (err error) { - if !forceRefresh && time.Since(t.cacheBuiltAt) < cacheTimeout { +// Check if the cache is out of date, and build it if so, ensuring only one cache-build is running at a time. +func (t *table) updateCache(ctx context.Context, objectClient client.ObjectClient) error { + t.mtx.RLock() + outOfDate := time.Since(t.cacheBuiltAt) >= cacheTimeout + t.mtx.RUnlock() + if !outOfDate { return nil } + _, err, _ := t.buildCacheGroup.Do(refreshKey, func() (interface{}, error) { + err := t.buildCache(ctx, objectClient) + return nil, err + }) + return err +} +func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient) (err error) { defer func() { if err != nil { level.Error(util_log.Logger).Log("msg", "failed to build table cache", "table_name", t.name, "err", err) diff --git a/pkg/storage/stores/indexshipper/storage/cached_client_test.go b/pkg/storage/stores/indexshipper/storage/cached_client_test.go index a076594de5cbf..7083b812ff2a5 100644 --- a/pkg/storage/stores/indexshipper/storage/cached_client_test.go +++ b/pkg/storage/stores/indexshipper/storage/cached_client_test.go @@ -219,7 +219,7 @@ func TestCachedObjectClient_errors(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), tc.prefix, "", false) + objects, commonPrefixes, err := cachedObjectClient.List(context.Background(), tc.prefix, "", false) require.NoError(t, err) require.Equal(t, expectedListCallsCount, objectClient.listCallsCount) require.Equal(t, tc.expectedObjects, objects) diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 0000000000000..8473fb7922c16 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,205 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + g.mu.Lock() + defer g.mu.Unlock() + c.wg.Done() + if g.m[key] == c { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index fdadfdc6a0a8c..08d1dd88c7cd0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1569,6 +1569,7 @@ golang.org/x/oauth2/jwt ## explicit; go 1.17 golang.org/x/sync/errgroup golang.org/x/sync/semaphore +golang.org/x/sync/singleflight # golang.org/x/sys v0.10.0 ## explicit; go 1.17 golang.org/x/sys/cpu