Skip to content

Commit

Permalink
fix: do not rename files on mmap failure (#25340)
Browse files Browse the repository at this point in the history
If NewTSMReader() fails because mmap fails, do not
rename the file, because the error is probably
caused by vm.max_map_count being too low

Closes: #25337

(cherry picked from commit ec412f7)
  • Loading branch information
gwossum authored Sep 17, 2024
1 parent 5a59938 commit 5aff511
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 24 deletions.
3 changes: 1 addition & 2 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
}

fs := NewFileStore(path, etags)
fs := NewFileStore(path, etags, WithMadviseWillNeed(opt.Config.TSMWillNeed))
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
}
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed

cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags)

Expand Down
43 changes: 27 additions & 16 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,8 @@ type FileStore struct {
currentGeneration int
dir string

files []TSMFile
tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
files []TSMFile
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.

logger *zap.Logger // Logger to be used for important messages
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
Expand All @@ -198,6 +197,8 @@ type FileStore struct {
// newReaderBlockCount keeps track of the current new reader block requests.
// If non-zero, no new TSMReader objects may be created.
newReaderBlockCount int

readerOptions []tsmReaderOption
}

// FileStat holds information about a TSM file on disk.
Expand Down Expand Up @@ -234,7 +235,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
}

// NewFileStore returns a new instance of FileStore based on the given directory.
func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
func NewFileStore(dir string, tags tsdb.EngineTags, options ...tsmReaderOption) *FileStore {
logger := zap.NewNop()
fs := &FileStore{
dir: dir,
Expand All @@ -250,6 +251,7 @@ func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
obs: noFileStoreObserver{},
parseFileName: DefaultParseFileName,
copyFiles: runtime.GOOS == "windows",
readerOptions: options,
}
fs.purger.fileStore = fs
return fs
Expand Down Expand Up @@ -616,28 +618,37 @@ func (f *FileStore) Open(ctx context.Context) error {
defer f.openLimiter.Release()

start := time.Now()
df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))
df, err := NewTSMReader(file, f.readerOptions...)
f.logger.Info("Opened file",
zap.String("path", file.Name()),
zap.Int("id", idx),
zap.Duration("duration", time.Since(start)))

// If we are unable to read a TSM file then log the error, rename
// the file, and continue loading the shard without it.
// If we are unable to read a TSM file then log the error.
if err != nil {
if cerr := file.Close(); cerr != nil {
f.logger.Error("Error closing TSM file after error", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(cerr))
}
// If the file is corrupt, rename it and
// continue loading the shard without it.
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %w", file.Name(), e)}
if errors.Is(err, MmapError{}) {
// An MmapError may indicate we have insufficient
// handles for the mmap call, in which case the file should
// be left untouched, and the vm.max_map_count be raised.
f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low",
zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
readerC <- &res{r: df, err: fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v", file.Name(), err)}
return
} else {
// If the file is corrupt, rename it and
// continue loading the shard without it.
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %w", file.Name(), err)}
return
}
df.WithObserver(f.obs)
readerC <- &res{r: df}
Expand Down Expand Up @@ -920,7 +931,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
}
}

tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed))
tsm, err := NewTSMReader(fd, f.readerOptions...)
if err != nil {
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {
Expand Down
114 changes: 114 additions & 0 deletions tsdb/engine/tsm1/file_store_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package tsm1

import (
"github.com/influxdata/influxdb/v2/tsdb"
)

var TestMmapInitFailOption = func(err error) tsmReaderOption {
return func(r *TSMReader) {
r.accessor = &badBlockAccessor{error: err}
}
}

type badBlockAccessor struct {
error
initCalled bool
}

func (b *badBlockAccessor) init() (*indirectIndex, error) {
b.initCalled = true
return nil, b.error
}

func (b *badBlockAccessor) read(key []byte, timestamp int64) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readAll(key []byte) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) rename(path string) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) path() string {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) close() error {
if !b.initCalled {
panic("close called without an init call")
}
b.initCalled = false
return nil
}

func (b *badBlockAccessor) free() error {
//TODO implement me
panic("implement me")
}
37 changes: 37 additions & 0 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
Expand Down Expand Up @@ -2412,6 +2413,42 @@ func TestFileStore_Open(t *testing.T) {
}
}

func TestFileStore_OpenFail(t *testing.T) {
var err error
dir := t.TempDir()

// Create a TSM file...
data := keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}

files, err := newFileDir(t, dir, data)
if err != nil {
fatal(t, "creating test files", err)
}
assert.Equal(t, 1, len(files))
f := files[0]

const mmapErrMsg = "mmap failure in test"
const fullMmapErrMsg = "system limit for vm.max_map_count may be too low: " + mmapErrMsg
// With an mmap failure, the files should all be left where they are, because they are not corrupt
openFail(t, dir, fullMmapErrMsg, tsm1.NewMmapError(fmt.Errorf(mmapErrMsg)))
assert.FileExistsf(t, f, "file not found, but should not have been moved for mmap failure")

// With a non-mmap failure, the file failing to open should be moved aside
const otherErrMsg = "some Random Init Failure"
openFail(t, dir, otherErrMsg, fmt.Errorf(otherErrMsg))
assert.NoFileExistsf(t, f, "file found, but should have been moved for open failure")
assert.FileExistsf(t, f+"."+tsm1.BadTSMFileExtension, "file not found, but should have been moved here for open failure")
}

func openFail(t *testing.T, dir string, fullErrMsg string, initErr error) {
fs := tsm1.NewFileStore(dir, tsdb.EngineTags{}, tsm1.TestMmapInitFailOption(initErr))
err := fs.Open(context.Background())
assert.Error(t, err)
assert.Contains(t, err.Error(), fullErrMsg)
defer func() { assert.NoError(t, fs.Close(), "unexpected error on FileStore.Close") }()
assert.Equal(t, 0, fs.Count(), "file count mismatch")
}

func TestFileStore_Remove(t *testing.T) {
dir := t.TempDir()

Expand Down
36 changes: 30 additions & 6 deletions tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsm1
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -218,6 +219,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
}
}

// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure
// NewTSMReader returns a new TSMReader from the given file.
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
t := &TSMReader{}
Expand All @@ -231,15 +233,17 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
}
t.size = stat.Size()
t.lastModified = stat.ModTime().UnixNano()
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
if t.accessor == nil {
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
}
}

index, err := t.accessor.init()
if err != nil {
_ = t.accessor.close()
return nil, err
cerr := t.accessor.close()
return nil, errors.Join(err, cerr)
}

t.index = index
Expand Down Expand Up @@ -1314,6 +1318,24 @@ type mmapAccessor struct {
index *indirectIndex
}

type MmapError struct {
error
}

func (m *MmapError) Unwrap() error {
return m.error
}

func (m MmapError) Is(e error) bool {
_, oks := e.(MmapError)
_, okp := e.(*MmapError)
return oks || okp
}

func NewMmapError(e error) MmapError {
return MmapError{error: e}
}

func (m *mmapAccessor) init() (*indirectIndex, error) {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -1335,7 +1357,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {

m.b, err = mmap(m.f, 0, int(stat.Size()))
if err != nil {
return nil, err
// Wrap the error to let callers know this was an error
// from mmap, and may indicate vm.max_map_count is too low
return nil, NewMmapError(err)
}
if len(m.b) < 8 {
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
Expand Down

0 comments on commit 5aff511

Please sign in to comment.