Skip to content

Commit

Permalink
fix: correctly handle MaxSeriesPerDatabaseExceeded (#23091) (#23093)
Browse files Browse the repository at this point in the history
Check for the correctly returned PartialWriteError
in (*shard).validateSeriesAndFields, allow partial
writes.

closes #23090

(cherry picked from commit 0c3dca8)

closes #23092
  • Loading branch information
davidby-influx authored Feb 2, 2022
1 parent d7f910c commit 85580fa
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 8 deletions.
8 changes: 5 additions & 3 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measu
i.mu.RLock()
if max := opt.Config.MaxSeriesPerDatabase; max > 0 && len(i.series)+len(keys) > max {
i.mu.RUnlock()
return errMaxSeriesPerDatabaseExceeded{limit: opt.Config.MaxSeriesPerDatabase}
return errMaxSeriesPerDatabaseExceeded{limit: opt.Config.MaxSeriesPerDatabase, series: len(i.series), keys: len(keys)}
}
i.mu.RUnlock()
}
Expand Down Expand Up @@ -1362,9 +1362,11 @@ func (itr *seriesIDIterator) nextKeys() error {
// errMaxSeriesPerDatabaseExceeded is a marker error returned during series creation
// to indicate that a new series would exceed the limits of the database.
type errMaxSeriesPerDatabaseExceeded struct {
limit int
limit int
series int
keys int
}

func (e errMaxSeriesPerDatabaseExceeded) Error() string {
return fmt.Sprintf("max-series-per-database limit exceeded: (%d)", e.limit)
return fmt.Sprintf("max-series-per-database exceeded limit=%d series=%d keys=%d", e.limit, e.series, e.keys)
}
9 changes: 6 additions & 3 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,12 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
var droppedKeys [][]byte
if err := engine.CreateSeriesListIfNotExists(keys, names, tagsSlice, tracker); err != nil {
switch err := err.(type) {
// TODO(jmw): why is this a *PartialWriteError when everything else is not a pointer?
// Maybe we can just change it to be consistent if we change it also in all
// the places that construct it.
// (DSB) This was previously *PartialWriteError. Now catch pointer and value types.
case PartialWriteError:
reason = err.Reason
dropped += err.Dropped
droppedKeys = err.DroppedKeys
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
case *PartialWriteError:
reason = err.Reason
dropped += err.Dropped
Expand Down
2 changes: 1 addition & 1 deletion tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestMaxSeriesLimit(t *testing.T) {
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
if err == nil {
t.Fatal("expected error")
} else if exp, got := `partial write: max-series-per-database limit exceeded: (1000) dropped=1`, err.Error(); exp != got {
} else if exp, got := `partial write: max-series-per-database exceeded limit=1000 series=1000 keys=1 dropped=1`, err.Error(); exp != got {
t.Fatalf("unexpected error message:\n\texp = %s\n\tgot = %s", exp, got)
}

Expand Down
2 changes: 1 addition & 1 deletion tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,7 @@ func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) {
to := from + pointsPerShard

if err := store.Store.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points[from:to]); err != nil {
if !strings.Contains(err.Error(), "partial write: max-series-per-database limit exceeded:") {
if !strings.Contains(err.Error(), "partial write: max-series-per-database exceeded limit") {
t.Fatal(err)
}
}
Expand Down

0 comments on commit 85580fa

Please sign in to comment.