Skip to content

Commit

Permalink
fix: discard excessive errors
Browse files Browse the repository at this point in the history
The tsmBatchKeyIterator discards excessive errors to avoid
out-of-memory crashes when compacting very corrupt files.
Any error beyond DefaultMaxSavedErrors (100) will be
discarded instead of appended to the error slice.

closes #22328
  • Loading branch information
davidby-influx committed Sep 2, 2021
1 parent 50d3bca commit c4b80d5
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 4 deletions.
23 changes: 19 additions & 4 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (

// TSMFileExtension is the extension used for TSM files.
TSMFileExtension = "tsm"

// DefaultMaxSavedErrors is the number of errors that are stored by a TSMBatchKeyReader before
// subsequent errors are discarded
DefaultMaxSavedErrors = 100
)

var (
Expand Down Expand Up @@ -954,7 +958,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([
return nil, nil
}

tsm, err := NewTSMBatchKeyIterator(size, fast, intC, tsmFiles, trs...)
tsm, err := NewTSMBatchKeyIterator(size, fast, DefaultMaxSavedErrors, intC, tsmFiles, trs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1660,15 +1664,25 @@ type tsmBatchKeyIterator struct {
// without decode
merged blocks
interrupt chan struct{}

// maxErrors is the maximum number of errors to store before discarding.
maxErrors int
}

func (t *tsmBatchKeyIterator) AppendError(err error) {
t.errs = append(t.errs, err)
func (t *tsmBatchKeyIterator) AppendError(err error) bool {
if t.maxErrors > len(t.errs) {
t.errs = append(t.errs, err)
// Was the error stored?
return true
} else {
// Was the error dropped
return false
}
}

// NewTSMBatchKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, tsmFiles []string, readers ...*TSMReader) (KeyIterator, error) {
func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan struct{}, tsmFiles []string, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
Expand All @@ -1689,6 +1703,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, tsmFil
mergedBooleanValues: &tsdb.BooleanArray{},
mergedStringValues: &tsdb.StringArray{},
interrupt: interrupt,
maxErrors: maxErrors,
}, nil
}

Expand Down
86 changes: 86 additions & 0 deletions tsdb/engine/tsm1/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,92 @@ func TestTSMReader_References(t *testing.T) {
}
}

func TestBatchKeyIterator_Errors(t *testing.T) {
const MaxErrors = 10

dir, name := createTestTSM(t)
defer os.RemoveAll(dir)
fr, err := os.Open(name)
if err != nil {
t.Fatalf("unexpected error opening file %s: %v", name, err)
}
r, err := NewTSMReader(fr)
if err != nil {
// Only have a deferred close if we could not create the TSMReader
defer func() {
if e := fr.Close(); e != nil {
t.Fatalf("unexpected error closing %s: %v", name, e)
}
}()

t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err)
}
defer func() {
if e := r.Close(); e != nil {
t.Fatalf("error closing TSMReader for %s: %v", name, e)
}
}()
interrupts := make(chan struct{})
var iter KeyIterator
if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil {
t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err)
}
for i := 0; i < MaxErrors*2; i++ {
saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i))
if i < MaxErrors && !saved {
t.Fatalf("error unexpectedly not saved: %d", i)
}
if i >= MaxErrors && saved {
t.Fatalf("error unexpectedly saved: %d", i)
}
}
if errCnt := len(iter.Err().(TSMErrors)); errCnt != MaxErrors {
t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt)
}
}

func createTestTSM(t *testing.T) (dir string, name string) {
dir = MustTempDir()
f := mustTempFile(dir)
name = f.Name()
w, err := NewTSMWriter(f)
if err != nil {
f.Close()
t.Fatalf("unexpected error creating writer for %s: %v", name, err)
}
defer func() {
if e := w.Close(); e != nil {
t.Fatalf("write TSM close of %s: %v", name, err)
}
}()

var data = map[string][]Value{
"float": []Value{NewValue(1, 1.0)},
"int": []Value{NewValue(1, int64(1))},
"uint": []Value{NewValue(1, ^uint64(0))},
"bool": []Value{NewValue(1, true)},
"string": []Value{NewValue(1, "foo")},
}

keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)

for _, k := range keys {
if err := w.Write([]byte(k), data[k]); err != nil {
t.Fatalf("write TSM value: %v", err)
}
}

if err := w.WriteIndex(); err != nil {
t.Fatalf("write TSM index: %v", err)
}

return dir, name
}

func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) {
index := NewIndexWriter()
for i := 0; i < 100000; i++ {
Expand Down

0 comments on commit c4b80d5

Please sign in to comment.