From 85580fa8c10a097ff28893f235db3cbc708b1c5e Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Tue, 1 Feb 2022 21:51:26 -0800 Subject: [PATCH] fix: correctly handle MaxSeriesPerDatabaseExceeded (#23091) (#23093) Check for the correctly returned PartialWriteError in (*shard).validateSeriesAndFields, allow partial writes. closes https://github.com/influxdata/influxdb/issues/23090 (cherry picked from commit 0c3dca883ee58ca204e6cb0d528bfe2b93e1956d) closes https://github.com/influxdata/influxdb/issues/23092 --- tsdb/index/inmem/inmem.go | 8 +++++--- tsdb/shard.go | 9 ++++++--- tsdb/shard_test.go | 2 +- tsdb/store_test.go | 2 +- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index a0250edb455..6c43703264a 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -194,7 +194,7 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measu i.mu.RLock() if max := opt.Config.MaxSeriesPerDatabase; max > 0 && len(i.series)+len(keys) > max { i.mu.RUnlock() - return errMaxSeriesPerDatabaseExceeded{limit: opt.Config.MaxSeriesPerDatabase} + return errMaxSeriesPerDatabaseExceeded{limit: opt.Config.MaxSeriesPerDatabase, series: len(i.series), keys: len(keys)} } i.mu.RUnlock() } @@ -1362,9 +1362,11 @@ func (itr *seriesIDIterator) nextKeys() error { // errMaxSeriesPerDatabaseExceeded is a marker error returned during series creation // to indicate that a new series would exceed the limits of the database. type errMaxSeriesPerDatabaseExceeded struct { - limit int + limit int + series int + keys int } func (e errMaxSeriesPerDatabaseExceeded) Error() string { - return fmt.Sprintf("max-series-per-database limit exceeded: (%d)", e.limit) + return fmt.Sprintf("max-series-per-database exceeded limit=%d series=%d keys=%d", e.limit, e.series, e.keys) } diff --git a/tsdb/shard.go b/tsdb/shard.go index 6b1a7239c36..573c9e264e8 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -615,9 +615,12 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac var droppedKeys [][]byte if err := engine.CreateSeriesListIfNotExists(keys, names, tagsSlice, tracker); err != nil { switch err := err.(type) { - // TODO(jmw): why is this a *PartialWriteError when everything else is not a pointer? - // Maybe we can just change it to be consistent if we change it also in all - // the places that construct it. + // (DSB) This was previously *PartialWriteError. Now catch pointer and value types. + case PartialWriteError: + reason = err.Reason + dropped += err.Dropped + droppedKeys = err.DroppedKeys + atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) case *PartialWriteError: reason = err.Reason dropped += err.Dropped diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index e4489445f1c..31e1be7ff0e 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -201,7 +201,7 @@ func TestMaxSeriesLimit(t *testing.T) { err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker()) if err == nil { t.Fatal("expected error") - } else if exp, got := `partial write: max-series-per-database limit exceeded: (1000) dropped=1`, err.Error(); exp != got { + } else if exp, got := `partial write: max-series-per-database exceeded limit=1000 series=1000 keys=1 dropped=1`, err.Error(); exp != got { t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got) } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 64487d360de..05414e20c72 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -1366,7 +1366,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) { to := from + pointsPerShard if err := store.Store.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points[from:to]); err != nil { - if !strings.Contains(err.Error(), "partial write: max-series-per-database limit exceeded:") { + if !strings.Contains(err.Error(), "partial write: max-series-per-database exceeded limit") { t.Fatal(err) } }