diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index a6c872ac54e..b441156bc16 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -40,6 +40,10 @@ const ( // TSMFileExtension is the extension used for TSM files. TSMFileExtension = "tsm" + + // DefaultMaxSavedErrors is the number of errors that are stored by a TSMBatchKeyReader before + // subsequent errors are discarded + DefaultMaxSavedErrors = 100 ) var ( @@ -954,7 +958,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([ return nil, nil } - tsm, err := NewTSMBatchKeyIterator(size, fast, intC, tsmFiles, trs...) + tsm, err := NewTSMBatchKeyIterator(size, fast, DefaultMaxSavedErrors, intC, tsmFiles, trs...) if err != nil { return nil, err } @@ -1660,15 +1664,28 @@ type tsmBatchKeyIterator struct { // without decode merged blocks interrupt chan struct{} + + // maxErrors is the maximum number of errors to store before discarding. + maxErrors int + // overflowErrors is the number of errors we have ignored. + overflowErrors int } -func (t *tsmBatchKeyIterator) AppendError(err error) { - t.errs = append(t.errs, err) +func (t *tsmBatchKeyIterator) AppendError(err error) bool { + if t.maxErrors > len(t.errs) { + t.errs = append(t.errs, err) + // Was the error stored? + return true + } else { + // Was the error dropped + t.overflowErrors++ + return false + } } // NewTSMBatchKeyIterator returns a new TSM key iterator from readers. // size indicates the maximum number of values to encode in a single block. -func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, tsmFiles []string, readers ...*TSMReader) (KeyIterator, error) { +func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan struct{}, tsmFiles []string, readers ...*TSMReader) (KeyIterator, error) { var iter []*BlockIterator for _, r := range readers { iter = append(iter, r.BlockIterator()) @@ -1689,6 +1706,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, tsmFil mergedBooleanValues: &tsdb.BooleanArray{}, mergedStringValues: &tsdb.StringArray{}, interrupt: interrupt, + maxErrors: maxErrors, }, nil } @@ -1916,7 +1934,12 @@ func (k *tsmBatchKeyIterator) Err() error { if len(k.errs) == 0 { return nil } - return k.errs + // Copy the errors before appending the dropped error count + var errs TSMErrors + errs = make([]error, 0, len(k.errs)+1) + errs = append(errs, k.errs...) + errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors)) + return errs } type cacheKeyIterator struct { diff --git a/tsdb/engine/tsm1/reader_test.go b/tsdb/engine/tsm1/reader_test.go index b413897d335..b842e6af2ca 100644 --- a/tsdb/engine/tsm1/reader_test.go +++ b/tsdb/engine/tsm1/reader_test.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "sort" + "strings" "testing" "github.com/stretchr/testify/require" @@ -1864,6 +1865,98 @@ func TestTSMReader_References(t *testing.T) { } } +func TestBatchKeyIterator_Errors(t *testing.T) { + const MaxErrors = 10 + + dir, name := createTestTSM(t) + defer os.RemoveAll(dir) + fr, err := os.Open(name) + if err != nil { + t.Fatalf("unexpected error opening file %s: %v", name, err) + } + r, err := NewTSMReader(fr) + if err != nil { + // Only have a deferred close if we could not create the TSMReader + defer func() { + if e := fr.Close(); e != nil { + t.Fatalf("unexpected error closing %s: %v", name, e) + } + }() + + t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err) + } + defer func() { + if e := r.Close(); e != nil { + t.Fatalf("error closing TSMReader for %s: %v", name, e) + } + }() + interrupts := make(chan struct{}) + var iter KeyIterator + if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil { + t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err) + } + var i int + for i = 0; i < MaxErrors*2; i++ { + saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i)) + if i < MaxErrors && !saved { + t.Fatalf("error unexpectedly not saved: %d", i) + } + if i >= MaxErrors && saved { + t.Fatalf("error unexpectedly saved: %d", i) + } + } + errs := iter.Err() + if errCnt := len(errs.(TSMErrors)); errCnt != (MaxErrors + 1) { + t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt) + } + expected := fmt.Sprintf("additional errors dropped: %d", i-MaxErrors) + if strings.Compare(errs.(TSMErrors)[MaxErrors].Error(), expected) != 0 { + t.Fatalf("expected: '%s', got: '%s", expected, errs.(TSMErrors)[MaxErrors].Error()) + } +} + +func createTestTSM(t *testing.T) (dir string, name string) { + dir = MustTempDir() + f := mustTempFile(dir) + name = f.Name() + w, err := NewTSMWriter(f) + if err != nil { + f.Close() + t.Fatalf("unexpected error creating writer for %s: %v", name, err) + } + defer func() { + if e := w.Close(); e != nil { + t.Fatalf("write TSM close of %s: %v", name, err) + } + }() + + var data = map[string][]Value{ + "float": []Value{NewValue(1, 1.0)}, + "int": []Value{NewValue(1, int64(1))}, + "uint": []Value{NewValue(1, ^uint64(0))}, + "bool": []Value{NewValue(1, true)}, + "string": []Value{NewValue(1, "foo")}, + } + + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + if err := w.Write([]byte(k), data[k]); err != nil { + t.Fatalf("write TSM value: %v", err) + } + } + + if err := w.WriteIndex(); err != nil { + t.Fatalf("write TSM index: %v", err) + } + + return dir, name +} + func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) { index := NewIndexWriter() for i := 0; i < 100000; i++ {