From d0dd842149c334de738bcba267add6890126ccbc Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Mon, 13 Jun 2022 12:51:19 -0700 Subject: [PATCH] fix: remember shards that fail Open(), avoid repeated attempts (#23437) (#23444) If a shard cannot be opened, store its ID and last error. Prevent future attempts to open during this invocation of influxDB. This information is not persisted. closes https://github.com/influxdata/influxdb/issues/23428 closes https://github.com/influxdata/influxdb/issues/23426 (cherry picked from commit 54ac7e54edbbb8a495d5b1be626a21b0165fb77e) closes https://github.com/influxdata/influxdb/issues/23433 closes https://github.com/influxdata/influxdb/issues/23435 --- coordinator/points_writer.go | 2 +- tsdb/store.go | 68 ++++++++++++++++++++++++++++++++++-- tsdb/store_test.go | 32 +++++++++++++++-- 3 files changed, 95 insertions(+), 7 deletions(-) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 254ecccce89..afb01cd791e 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -407,7 +407,7 @@ func (w *PointsWriter) writeToShard(writeCtx tsdb.WriteContext, shard *meta.Shar // store has not actually created this shard, tell it to create it and // retry the write if err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true); err != nil { - w.Logger.Info("Write failed", zap.Uint64("shard", shard.ID), zap.Error(err)) + w.Logger.Warn("Write failed creating shard", zap.Uint64("shard", shard.ID), zap.Error(err)) atomic.AddInt64(&w.stats.WriteErr, 1) return err } diff --git a/tsdb/store.go b/tsdb/store.go index 9e2c8e93da0..7197eca6cbc 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -81,6 +81,28 @@ type StoreStatistics struct { SeriesCreated int64 } +type shardErrorMap struct { + mu sync.Mutex + shardErrors map[uint64]error +} + +func (se *shardErrorMap) setShardOpenError(shardID uint64, err error) { + se.mu.Lock() + defer se.mu.Unlock() + if err == nil { + delete(se.shardErrors, shardID) + } else { + se.shardErrors[shardID] = &ErrPreviousShardFail{error: fmt.Errorf("opening shard previously failed with: %w", err)} + } +} + +func (se *shardErrorMap) shardError(shardID uint64) (error, bool) { + se.mu.Lock() + defer se.mu.Unlock() + oldErr, hasErr := se.shardErrors[shardID] + return oldErr, hasErr +} + // Store manages shards and indexes for databases. type Store struct { mu sync.RWMutex @@ -97,6 +119,9 @@ type Store struct { // This prevents new shards from being created while old ones are being deleted. pendingShardDeletes map[uint64]struct{} + // Maintains a set of shards that failed to open + badShards shardErrorMap + // Epoch tracker helps serialize writes and deletes that may conflict. It // is stored by shard. epochs map[uint64]*epochTracker @@ -125,6 +150,7 @@ func NewStore(path string) *Store { sfiles: make(map[string]*SeriesFile), indexes: make(map[string]interface{}), pendingShardDeletes: make(map[uint64]struct{}), + badShards: shardErrorMap{shardErrors: make(map[uint64]error)}, epochs: make(map[uint64]*epochTracker), EngineOptions: NewEngineOptions(), Logger: logger, @@ -449,9 +475,9 @@ func (s *Store) loadShards() error { shard.CompactionDisabled = s.EngineOptions.CompactionDisabled shard.WithLogger(s.baseLogger) - err = shard.Open() + err = s.OpenShard(shard, false) if err != nil { - log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err)) + log.Error("Failed to open shard", logger.Shard(shardID), zap.Error(err)) resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)} return } @@ -607,6 +633,42 @@ func (s *Store) Shard(id uint64) *Shard { return sh } +type ErrPreviousShardFail struct { + error +} + +func (e ErrPreviousShardFail) Unwrap() error { + return e.error +} + +func (e ErrPreviousShardFail) Is(err error) bool { + _, sOk := err.(ErrPreviousShardFail) + _, pOk := err.(*ErrPreviousShardFail) + return sOk || pOk +} + +func (e ErrPreviousShardFail) Error() string { + return e.error.Error() +} + +func (s *Store) OpenShard(sh *Shard, force bool) error { + if sh == nil { + return errors.New("cannot open nil shard") + } + oldErr, bad := s.badShards.shardError(sh.ID()) + if force || !bad { + err := sh.Open() + s.badShards.setShardOpenError(sh.ID(), err) + return err + } else { + return oldErr + } +} + +func (s *Store) SetShardOpenErrorForTest(shardID uint64, err error) { + s.badShards.setShardOpenError(shardID, err) +} + // Shards returns a list of shards by id. func (s *Store) Shards(ids []uint64) []*Shard { s.mu.RLock() @@ -700,7 +762,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en shard.WithLogger(s.baseLogger) shard.EnableOnOpen = enabled - if err := shard.Open(); err != nil { + if err := s.OpenShard(shard, false); err != nil { return err } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 22b34fd83fb..8acc0ac4766 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -27,6 +27,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/index/inmem" "github.com/influxdata/influxql" + "github.com/stretchr/testify/require" ) // Ensure the store can delete a retention policy and all shards under @@ -143,6 +144,31 @@ func TestStore_CreateShard(t *testing.T) { } } +func TestStore_BadShard(t *testing.T) { + const errStr = "a shard open error" + indexes := tsdb.RegisteredIndexes() + for _, idx := range indexes { + func() { + s := MustOpenStore(idx) + defer require.NoErrorf(t, s.Close(), "closing store with index type: %s", idx) + + sh := tsdb.NewTempShard(idx) + err := s.OpenShard(sh.Shard, false) + require.NoError(t, err, "opening temp shard") + defer require.NoError(t, sh.Close(), "closing temporary shard") + + s.SetShardOpenErrorForTest(sh.ID(), errors.New(errStr)) + err2 := s.OpenShard(sh.Shard, false) + require.Error(t, err2, "no error opening bad shard") + require.True(t, errors.Is(err2, tsdb.ErrPreviousShardFail{}), "exp: ErrPreviousShardFail, got: %v", err2) + require.EqualError(t, err2, "opening shard previously failed with: "+errStr) + + // This should succeed with the force (and because opening an open shard automatically succeeds) + require.NoError(t, s.OpenShard(sh.Shard, true), "forced re-opening previously failing shard") + }() + } +} + func TestStore_CreateMixedShards(t *testing.T) { t.Parallel() @@ -2048,7 +2074,7 @@ func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := sh.Open(); err != nil { + if err := s.OpenShard(sh, false); err != nil { errC <- err return } @@ -2133,7 +2159,7 @@ func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := sh.Open(); err != nil { + if err := s.OpenShard(sh, false); err != nil { errC <- err return } @@ -2224,7 +2250,7 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { return } time.Sleep(500 * time.Microsecond) - if err := sh.Open(); err != nil { + if err := s.OpenShard(sh, false); err != nil { errC <- err return }