Skip to content

Commit

Permalink
fix: remember shards that fail Open(), avoid repeated attempts (#23437)…
Browse files Browse the repository at this point in the history
… (#23444)

If a shard cannot be opened, store its ID and last error.
Prevent future attempts to open during this invocation of
influxDB. This information is not persisted.

closes #23428
closes #23426

(cherry picked from commit 54ac7e5)

closes #23433
closes #23435
  • Loading branch information
davidby-influx authored Jun 13, 2022
1 parent 5f1e5bf commit d0dd842
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 7 deletions.
2 changes: 1 addition & 1 deletion coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (w *PointsWriter) writeToShard(writeCtx tsdb.WriteContext, shard *meta.Shar
// store has not actually created this shard, tell it to create it and
// retry the write
if err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true); err != nil {
w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err))
w.Logger.Warn("Write failed creating shard", zap.Uint64("shard", shard.ID), zap.Error(err))
atomic.AddInt64(&w.stats.WriteErr, 1)
return err
}
Expand Down
68 changes: 65 additions & 3 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ type StoreStatistics struct {
SeriesCreated int64
}

type shardErrorMap struct {
mu sync.Mutex
shardErrors map[uint64]error
}

func (se *shardErrorMap) setShardOpenError(shardID uint64, err error) {
se.mu.Lock()
defer se.mu.Unlock()
if err == nil {
delete(se.shardErrors, shardID)
} else {
se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)}
}
}

func (se *shardErrorMap) shardError(shardID uint64) (error, bool) {
se.mu.Lock()
defer se.mu.Unlock()
oldErr, hasErr := se.shardErrors[shardID]
return oldErr, hasErr
}

// Store manages shards and indexes for databases.
type Store struct {
mu sync.RWMutex
Expand All @@ -97,6 +119,9 @@ type Store struct {
// This prevents new shards from being created while old ones are being deleted.
pendingShardDeletes map[uint64]struct{}

// Maintains a set of shards that failed to open
badShards shardErrorMap

// Epoch tracker helps serialize writes and deletes that may conflict. It
// is stored by shard.
epochs map[uint64]*epochTracker
Expand Down Expand Up @@ -125,6 +150,7 @@ func NewStore(path string) *Store {
sfiles: make(map[string]*SeriesFile),
indexes: make(map[string]interface{}),
pendingShardDeletes: make(map[uint64]struct{}),
badShards: shardErrorMap{shardErrors: make(map[uint64]error)},
epochs: make(map[uint64]*epochTracker),
EngineOptions: NewEngineOptions(),
Logger: logger,
Expand Down Expand Up @@ -449,9 +475,9 @@ func (s *Store) loadShards() error {
shard.CompactionDisabled = s.EngineOptions.CompactionDisabled
shard.WithLogger(s.baseLogger)

err = shard.Open()
err = s.OpenShard(shard, false)
if err != nil {
log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err))
log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err))
resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)}
return
}
Expand Down Expand Up @@ -607,6 +633,42 @@ func (s *Store) Shard(id uint64) *Shard {
return sh
}

type ErrPreviousShardFail struct {
error
}

func (e ErrPreviousShardFail) Unwrap() error {
return e.error
}

func (e ErrPreviousShardFail) Is(err error) bool {
_, sOk := err.(ErrPreviousShardFail)
_, pOk := err.(*ErrPreviousShardFail)
return sOk || pOk
}

func (e ErrPreviousShardFail) Error() string {
return e.error.Error()
}

func (s *Store) OpenShard(sh *Shard, force bool) error {
if sh == nil {
return errors.New("cannot open nil shard")
}
oldErr, bad := s.badShards.shardError(sh.ID())
if force || !bad {
err := sh.Open()
s.badShards.setShardOpenError(sh.ID(), err)
return err
} else {
return oldErr
}
}

func (s *Store) SetShardOpenErrorForTest(shardID uint64, err error) {
s.badShards.setShardOpenError(shardID, err)
}

// Shards returns a list of shards by id.
func (s *Store) Shards(ids []uint64) []*Shard {
s.mu.RLock()
Expand Down Expand Up @@ -700,7 +762,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en
shard.WithLogger(s.baseLogger)
shard.EnableOnOpen = enabled

if err := shard.Open(); err != nil {
if err := s.OpenShard(shard, false); err != nil {
return err
}

Expand Down
32 changes: 29 additions & 3 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxql"
"github.com/stretchr/testify/require"
)

// Ensure the store can delete a retention policy and all shards under
Expand Down Expand Up @@ -143,6 +144,31 @@ func TestStore_CreateShard(t *testing.T) {
}
}

func TestStore_BadShard(t *testing.T) {
const errStr = "a shard open error"
indexes := tsdb.RegisteredIndexes()
for _, idx := range indexes {
func() {
s := MustOpenStore(idx)
defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx)

sh := tsdb.NewTempShard(idx)
err := s.OpenShard(sh.Shard, false)
require.NoError(t, err, "opening temp shard")
defer require.NoError(t, sh.Close(), "closing temporary shard")

s.SetShardOpenErrorForTest(sh.ID(), errors.New(errStr))
err2 := s.OpenShard(sh.Shard, false)
require.Error(t, err2, "no error opening bad shard")
require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2)
require.EqualError(t, err2, "opening shard previously failed with: "+errStr)

// This should succeed with the force (and because opening an open shard automatically succeeds)
require.NoError(t, s.OpenShard(sh.Shard, true), "forced re-opening previously failing shard")
}()
}
}

func TestStore_CreateMixedShards(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -2048,7 +2074,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
return
}
time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil {
if err := s.OpenShard(sh, false); err != nil {
errC <- err
return
}
Expand Down Expand Up @@ -2133,7 +2159,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
return
}
time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil {
if err := s.OpenShard(sh, false); err != nil {
errC <- err
return
}
Expand Down Expand Up @@ -2224,7 +2250,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
return
}
time.Sleep(500 * time.Microsecond)
if err := sh.Open(); err != nil {
if err := s.OpenShard(sh, false); err != nil {
errC <- err
return
}
Expand Down

0 comments on commit d0dd842

Please sign in to comment.