From 5aff511e4036e98d307a29ca9ba93e07cf061823 Mon Sep 17 00:00:00 2001 From: Geoffrey Wossum Date: Tue, 17 Sep 2024 12:48:21 -0500 Subject: [PATCH] fix: do not rename files on mmap failure (#25340) If NewTSMReader() fails because mmap fails, do not rename the file, because the error is probably caused by vm.max_map_count being too low Closes: #25337 (cherry picked from commit ec412f793b0fbb0d4c1ee35ebbb1fbbc69baabf0) --- tsdb/engine/tsm1/engine.go | 3 +- tsdb/engine/tsm1/file_store.go | 43 ++++--- tsdb/engine/tsm1/file_store_internal_test.go | 114 +++++++++++++++++++ tsdb/engine/tsm1/file_store_test.go | 37 ++++++ tsdb/engine/tsm1/reader.go | 36 +++++- 5 files changed, 209 insertions(+), 24 deletions(-) create mode 100644 tsdb/engine/tsm1/file_store_internal_test.go diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 523de1ee4d9..7cfd660a94b 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -172,12 +172,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay) } - fs := NewFileStore(path, etags) + fs := NewFileStore(path, etags, WithMadviseWillNeed(opt.Config.TSMWillNeed)) fs.openLimiter = opt.OpenLimiter if opt.FileStoreObserver != nil { fs.WithObserver(opt.FileStoreObserver) } - fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags) diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index a94b2d782ed..ae281e98114 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -176,9 +176,8 @@ type FileStore struct { currentGeneration int dir string - files []TSMFile - tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files. - openLimiter limiter.Fixed // limit the number of concurrent opening TSM files. + files []TSMFile + openLimiter limiter.Fixed // limit the number of concurrent opening TSM files. logger *zap.Logger // Logger to be used for important messages traceLogger *zap.Logger // Logger to be used when trace-logging is on. @@ -198,6 +197,8 @@ type FileStore struct { // newReaderBlockCount keeps track of the current new reader block requests. // If non-zero, no new TSMReader objects may be created. newReaderBlockCount int + + readerOptions []tsmReaderOption } // FileStat holds information about a TSM file on disk. @@ -234,7 +235,7 @@ func (f FileStat) ContainsKey(key []byte) bool { } // NewFileStore returns a new instance of FileStore based on the given directory. -func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore { +func NewFileStore(dir string, tags tsdb.EngineTags, options ...tsmReaderOption) *FileStore { logger := zap.NewNop() fs := &FileStore{ dir: dir, @@ -250,6 +251,7 @@ func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore { obs: noFileStoreObserver{}, parseFileName: DefaultParseFileName, copyFiles: runtime.GOOS == "windows", + readerOptions: options, } fs.purger.fileStore = fs return fs @@ -616,28 +618,37 @@ func (f *FileStore) Open(ctx context.Context) error { defer f.openLimiter.Release() start := time.Now() - df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed)) + df, err := NewTSMReader(file, f.readerOptions...) f.logger.Info("Opened file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Duration("duration", time.Since(start))) - // If we are unable to read a TSM file then log the error, rename - // the file, and continue loading the shard without it. + // If we are unable to read a TSM file then log the error. if err != nil { if cerr := file.Close(); cerr != nil { f.logger.Error("Error closing TSM file after error", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(cerr)) } - // If the file is corrupt, rename it and - // continue loading the shard without it. - f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err)) - if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil { - f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e)) - readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %w", file.Name(), e)} + if errors.Is(err, MmapError{}) { + // An MmapError may indicate we have insufficient + // handles for the mmap call, in which case the file should + // be left untouched, and the vm.max_map_count be raised. + f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low", + zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err)) + readerC <- &res{r: df, err: fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v", file.Name(), err)} + return + } else { + // If the file is corrupt, rename it and + // continue loading the shard without it. + f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err)) + if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil { + f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e)) + readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)} + return + } + readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)} return } - readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %w", file.Name(), err)} - return } df.WithObserver(f.obs) readerC <- &res{r: df} @@ -920,7 +931,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF } } - tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed)) + tsm, err := NewTSMReader(fd, f.readerOptions...) if err != nil { if newName != oldName { if err1 := os.Rename(newName, oldName); err1 != nil { diff --git a/tsdb/engine/tsm1/file_store_internal_test.go b/tsdb/engine/tsm1/file_store_internal_test.go new file mode 100644 index 00000000000..8f0a1485b94 --- /dev/null +++ b/tsdb/engine/tsm1/file_store_internal_test.go @@ -0,0 +1,114 @@ +package tsm1 + +import ( + "github.com/influxdata/influxdb/v2/tsdb" +) + +var TestMmapInitFailOption = func(err error) tsmReaderOption { + return func(r *TSMReader) { + r.accessor = &badBlockAccessor{error: err} + } +} + +type badBlockAccessor struct { + error + initCalled bool +} + +func (b *badBlockAccessor) init() (*indirectIndex, error) { + b.initCalled = true + return nil, b.error +} + +func (b *badBlockAccessor) read(key []byte, timestamp int64) ([]Value, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readAll(key []byte) ([]Value, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) rename(path string) error { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) path() string { + //TODO implement me + panic("implement me") +} + +func (b *badBlockAccessor) close() error { + if !b.initCalled { + panic("close called without an init call") + } + b.initCalled = false + return nil +} + +func (b *badBlockAccessor) free() error { + //TODO implement me + panic("implement me") +} diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index c83446bd2ab..1a07b3ab1ee 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -2412,6 +2413,42 @@ func TestFileStore_Open(t *testing.T) { } } +func TestFileStore_OpenFail(t *testing.T) { + var err error + dir := t.TempDir() + + // Create a TSM file... + data := keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}} + + files, err := newFileDir(t, dir, data) + if err != nil { + fatal(t, "creating test files", err) + } + assert.Equal(t, 1, len(files)) + f := files[0] + + const mmapErrMsg = "mmap failure in test" + const fullMmapErrMsg = "system limit for vm.max_map_count may be too low: " + mmapErrMsg + // With an mmap failure, the files should all be left where they are, because they are not corrupt + openFail(t, dir, fullMmapErrMsg, tsm1.NewMmapError(fmt.Errorf(mmapErrMsg))) + assert.FileExistsf(t, f, "file not found, but should not have been moved for mmap failure") + + // With a non-mmap failure, the file failing to open should be moved aside + const otherErrMsg = "some Random Init Failure" + openFail(t, dir, otherErrMsg, fmt.Errorf(otherErrMsg)) + assert.NoFileExistsf(t, f, "file found, but should have been moved for open failure") + assert.FileExistsf(t, f+"."+tsm1.BadTSMFileExtension, "file not found, but should have been moved here for open failure") +} + +func openFail(t *testing.T, dir string, fullErrMsg string, initErr error) { + fs := tsm1.NewFileStore(dir, tsdb.EngineTags{}, tsm1.TestMmapInitFailOption(initErr)) + err := fs.Open(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), fullErrMsg) + defer func() { assert.NoError(t, fs.Close(), "unexpected error on FileStore.Close") }() + assert.Equal(t, 0, fs.Count(), "file count mismatch") +} + func TestFileStore_Remove(t *testing.T) { dir := t.TempDir() diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 174c69071c2..b6cd2220b26 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -3,6 +3,7 @@ package tsm1 import ( "bytes" "encoding/binary" + "errors" "fmt" "io" "math" @@ -218,6 +219,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption { } } +// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure // NewTSMReader returns a new TSMReader from the given file. func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { t := &TSMReader{} @@ -231,15 +233,17 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { } t.size = stat.Size() t.lastModified = stat.ModTime().UnixNano() - t.accessor = &mmapAccessor{ - f: f, - mmapWillNeed: t.madviseWillNeed, + if t.accessor == nil { + t.accessor = &mmapAccessor{ + f: f, + mmapWillNeed: t.madviseWillNeed, + } } index, err := t.accessor.init() if err != nil { - _ = t.accessor.close() - return nil, err + cerr := t.accessor.close() + return nil, errors.Join(err, cerr) } t.index = index @@ -1314,6 +1318,24 @@ type mmapAccessor struct { index *indirectIndex } +type MmapError struct { + error +} + +func (m *MmapError) Unwrap() error { + return m.error +} + +func (m MmapError) Is(e error) bool { + _, oks := e.(MmapError) + _, okp := e.(*MmapError) + return oks || okp +} + +func NewMmapError(e error) MmapError { + return MmapError{error: e} +} + func (m *mmapAccessor) init() (*indirectIndex, error) { m.mu.Lock() defer m.mu.Unlock() @@ -1335,7 +1357,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) { m.b, err = mmap(m.f, 0, int(stat.Size())) if err != nil { - return nil, err + // Wrap the error to let callers know this was an error + // from mmap, and may indicate vm.max_map_count is too low + return nil, NewMmapError(err) } if len(m.b) < 8 { return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")