Skip to content

Commit

Permalink
[dbnode] Fix index flush recovery when previous flush attempts have f…
Browse files Browse the repository at this point in the history
…ailed (#1574)

Prior to this change if an index flush failed to complete and write out a checkpoint file, it would leave dirty files that would never be able to be overwritten.

This changes the behavior to only check that the checkpoint file does not exist and is valid, rather than check if a segment file exists or not before writing it.

It also updates all call sites to correctly always validate a checkpoint file rather than just check for existence in other parts of the persist/fs package.
  • Loading branch information
robskillington authored Apr 19, 2019
1 parent e7808f3 commit 7dad7ee
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 52 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

# UNRELEASED

## Bug Fixes

- **M3DB**: Fix index flush recovery when previous flush attempts have failed (#1574)

# 0.8.3 (2019-04-12)

## Performance
Expand Down
58 changes: 45 additions & 13 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,27 @@ var (

type fileOpener func(filePath string) (*os.File, error)

// LazyEvalBool is a boolean that is lazily evaluated.
type LazyEvalBool uint8

const (
// EvalNone indicates the boolean has not been evaluated.
EvalNone LazyEvalBool = iota
// EvalTrue indicates the boolean has been evaluated to true.
EvalTrue
// EvalFalse indicates the boolean has been evaluated to false.
EvalFalse
)

// FileSetFile represents a set of FileSet files for a given block start
type FileSetFile struct {
ID FileSetFileIdentifier
AbsoluteFilepaths []string

CachedSnapshotTime time.Time
CachedSnapshotID uuid.UUID
filePathPrefix string
CachedSnapshotTime time.Time
CachedSnapshotID uuid.UUID
CachedHasCompleteCheckpointFile LazyEvalBool
filePathPrefix string
}

// SnapshotTimeAndID returns the snapshot time and id for the given FileSetFile.
Expand All @@ -89,9 +102,11 @@ func (f *FileSetFile) SnapshotTimeAndID() (time.Time, uuid.UUID, error) {
if f.IsZero() {
return time.Time{}, nil, errSnapshotTimeAndIDZero
}
if len(f.AbsoluteFilepaths) > 0 && !strings.Contains(f.AbsoluteFilepaths[0], snapshotDirName) {
if len(f.AbsoluteFilepaths) > 0 &&
!strings.Contains(f.AbsoluteFilepaths[0], snapshotDirName) {
return time.Time{}, nil, fmt.Errorf(
"tried to determine snapshot time and id of non-snapshot: %s", f.AbsoluteFilepaths[0])
"tried to determine snapshot time and id of non-snapshot: %s",
f.AbsoluteFilepaths[0])
}

if !f.CachedSnapshotTime.IsZero() || f.CachedSnapshotID != nil {
Expand All @@ -115,16 +130,33 @@ func (f FileSetFile) IsZero() bool {
return len(f.AbsoluteFilepaths) == 0
}

// HasCheckpointFile returns a bool indicating whether the given set of
// HasCompleteCheckpointFile returns a bool indicating whether the given set of
// fileset files has a checkpoint file.
func (f FileSetFile) HasCheckpointFile() bool {
func (f *FileSetFile) HasCompleteCheckpointFile() bool {
switch f.CachedHasCompleteCheckpointFile {
case EvalNone:
f.CachedHasCompleteCheckpointFile = f.evalHasCompleteCheckpointFile()
return f.HasCompleteCheckpointFile()
case EvalTrue:
return true
}
return false
}

func (f *FileSetFile) evalHasCompleteCheckpointFile() LazyEvalBool {
for _, fileName := range f.AbsoluteFilepaths {
if strings.Contains(fileName, checkpointFileSuffix) {
return true
exists, err := CompleteCheckpointFileExists(fileName)
if err != nil {
continue
}
if exists {
return EvalTrue
}
}
}

return false
return EvalFalse
}

// FileSetFilesSlice is a slice of FileSetFile
Expand Down Expand Up @@ -161,7 +193,7 @@ func (f FileSetFilesSlice) LatestVolumeForBlock(blockStart time.Time) (FileSetFi
break
}

if curr.HasCheckpointFile() && curr.ID.VolumeIndex >= bestSoFar.ID.VolumeIndex {
if curr.HasCompleteCheckpointFile() && curr.ID.VolumeIndex >= bestSoFar.ID.VolumeIndex {
bestSoFar = curr
bestSoFarExists = true
}
Expand Down Expand Up @@ -812,7 +844,7 @@ func FileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, blockSta
)
}

if !fileset.HasCheckpointFile() {
if !fileset.HasCompleteCheckpointFile() {
continue
}

Expand Down Expand Up @@ -841,7 +873,7 @@ func IndexFileSetsAt(filePathPrefix string, namespace ident.ID, blockStart time.
matches.sortByTimeAscending()
for _, fileset := range matches {
if fileset.ID.BlockStart.Equal(blockStart) {
if !fileset.HasCheckpointFile() {
if !fileset.HasCompleteCheckpointFile() {
continue
}
filesets = append(filesets, fileset)
Expand Down Expand Up @@ -1198,7 +1230,7 @@ func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, bl
return false, nil
}

return latest.HasCheckpointFile(), nil
return latest.HasCompleteCheckpointFile(), nil
}

// NextSnapshotMetadataFileIndex returns the next snapshot metadata file index.
Expand Down
76 changes: 63 additions & 13 deletions src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,11 @@ func TestNextIndexFileSetVolumeIndex(t *testing.T) {
curr = index

p := filesetPathFromTimeAndIndex(dataDir, blockStart, index, checkpointFileSuffix)
err = ioutil.WriteFile(p, []byte("bar"), defaultNewFileMode)

digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum([]byte("bar")))

err = ioutil.WriteFile(p, digestBuf, defaultNewFileMode)
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -780,14 +784,38 @@ func TestMultipleForBlockStart(t *testing.T) {
require.Equal(t, numSnapshotsPerBlock-1, latestSnapshot.ID.VolumeIndex)
}

func TestSnapshotFileHasCheckPointFile(t *testing.T) {
require.Equal(t, true, FileSetFile{
AbsoluteFilepaths: []string{"123-checkpoint-0.db"},
}.HasCheckpointFile())
func TestSnapshotFileHasCompleteCheckpointFile(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

checkpointFilePath := path.Join(dir, "123-checkpoint-0.db")

// Test a valid complete checkpoint file
digestBuffer := digest.NewBuffer()
digestBuffer.WriteDigest(digest.Checksum([]byte{1, 2, 3}))
err := ioutil.WriteFile(checkpointFilePath, digestBuffer, defaultNewFileMode)
require.NoError(t, err)

// Check validates a valid checkpoint file
f := FileSetFile{
AbsoluteFilepaths: []string{checkpointFilePath},
}
require.Equal(t, true, f.HasCompleteCheckpointFile())

require.Equal(t, false, FileSetFile{
AbsoluteFilepaths: []string{"123-index-0.db"},
}.HasCheckpointFile())
// Check fails when checkpoint exists but not valid
err = ioutil.WriteFile(checkpointFilePath, []byte{42}, defaultNewFileMode)
require.NoError(t, err)
f = FileSetFile{
AbsoluteFilepaths: []string{checkpointFilePath},
}
require.Equal(t, false, f.HasCompleteCheckpointFile())

// Check ignores index file path
indexFilePath := path.Join(dir, "123-index-0.db")
f = FileSetFile{
AbsoluteFilepaths: []string{indexFilePath},
}
require.Equal(t, false, f.HasCompleteCheckpointFile())
}

func TestSnapshotDirPath(t *testing.T) {
Expand Down Expand Up @@ -1107,7 +1135,16 @@ func createDataFiles(t *testing.T,
} else {
infoFilePath = filesetPathFromTime(shardDir, ts, fileSuffix)
}
createFile(t, infoFilePath, nil)
var contents []byte
if fileSuffix == checkpointFileSuffix {
// If writing a checkpoint file then write out a checksum of contents
// so that when code that validates the checkpoint file runs it returns
// successfully
digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum(contents))
contents = []byte(digestBuf)
}
createFile(t, infoFilePath, contents)
}
return dir
}
Expand All @@ -1121,13 +1158,26 @@ type indexFileSetFileIdentifiers []indexFileSetFileIdentifier

func (indexFilesets indexFileSetFileIdentifiers) create(t *testing.T, prefixDir string) {
for _, fileset := range indexFilesets {
fileSetFileIdentifiers{fileset.FileSetFileIdentifier}.create(t, prefixDir, persist.FileSetFlushType, fileset.Suffix)
idents := fileSetFileIdentifiers{fileset.FileSetFileIdentifier}
idents.create(t, prefixDir, persist.FileSetFlushType, fileset.Suffix)
}
}

type fileSetFileIdentifiers []FileSetFileIdentifier

func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fileSetType persist.FileSetType, fileSuffixes ...string) {
writeFile := func(t *testing.T, path string, contents []byte) {
if strings.Contains(path, checkpointFileSuffix) {
// If writing a checkpoint file then write out a checksum of contents
// so that when code that validates the checkpoint file runs it returns
// successfully
digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum(contents))
contents = []byte(digestBuf)
}
createFile(t, path, contents)
}

for _, suffix := range fileSuffixes {
for _, fileset := range filesets {
switch fileset.FileSetContentType {
Expand All @@ -1141,10 +1191,10 @@ func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fi
switch fileSetType {
case persist.FileSetFlushType:
path = filesetPathFromTime(shardDir, blockStart, suffix)
createFile(t, path, nil)
writeFile(t, path, nil)
case persist.FileSetSnapshotType:
path = filesetPathFromTimeAndIndex(shardDir, blockStart, 0, fileSuffix)
createFile(t, path, nil)
writeFile(t, path, nil)
default:
panic("unknown FileSetType")
}
Expand All @@ -1158,7 +1208,7 @@ func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fi
switch fileSetType {
case persist.FileSetFlushType:
path = filesetPathFromTimeAndIndex(indexDir, blockStart, volumeIndex, suffix)
createFile(t, path, nil)
writeFile(t, path, nil)
case persist.FileSetSnapshotType:
fallthrough
default:
Expand Down
19 changes: 10 additions & 9 deletions src/dbnode/persist/fs/index_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,18 @@ func (w *indexWriter) Open(opts IndexWriterOpenOptions) error {
if err := os.MkdirAll(w.namespaceDir, w.newDirectoryMode); err != nil {
return err
}
w.checkpointFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, checkpointFileSuffix)
w.infoFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, infoFileSuffix)
w.digestFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, digestFileSuffix)
w.checkpointFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, checkpointFileSuffix)

exists, err := CompleteCheckpointFileExists(w.checkpointFilePath)
if err != nil {
return err
}
if exists {
return fmt.Errorf("checkpoint already exists for volume: %s",
w.checkpointFilePath)
}

return nil
}
Expand Down Expand Up @@ -175,14 +184,6 @@ func (w *indexWriter) WriteSegmentFileSet(
err := fmt.Errorf("unknown fileset type: %s", w.fileSetType)
return w.markSegmentWriteError(segType, segFileType, err)
}
exists, err := FileExists(filePath)
if err != nil {
return err
}
if exists {
err := fmt.Errorf("segment file type already exists at %s", filePath)
return w.markSegmentWriteError(segType, segFileType, err)
}

fd, err := OpenWritable(filePath, w.newFileMode)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions src/dbnode/persist/fs/persist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package fs
import (
"errors"
"io"
"io/ioutil"
"os"
"testing"
"time"
Expand All @@ -33,9 +34,9 @@ import (
"github.com/m3db/m3/src/m3ninx/index/segment"
m3ninxfs "github.com/m3db/m3/src/m3ninx/index/segment/fst"
m3ninxpersist "github.com/m3db/m3/src/m3ninx/persist"
m3test "github.com/m3db/m3/src/x/test"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/ident"
m3test "github.com/m3db/m3/src/x/test"
xtest "github.com/m3db/m3/src/x/test"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -327,9 +328,12 @@ func TestPersistenceManagerPrepareIndexFileExists(t *testing.T) {
blockStart := time.Unix(1000, 0)
indexDir := createIndexDataDir(t, pm.filePathPrefix, testNs1ID)
checkpointFilePath := filesetPathFromTimeAndIndex(indexDir, blockStart, 0, checkpointFileSuffix)
f, err := os.Create(checkpointFilePath)

digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum([]byte("foo")))

err := ioutil.WriteFile(checkpointFilePath, digestBuf, defaultNewFileMode)
require.NoError(t, err)
f.Close()

flush, err := pm.StartIndexPersist()
require.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import (
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

"go.uber.org/zap"
"github.com/uber-go/tally"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -402,9 +402,13 @@ func (s *commitLogSource) mostRecentCompleteSnapshotByBlockShard(
// CachedSnapshotTime field so that we can rely upon it from here on out.
_, _, err := mostRecentSnapshotVolume.SnapshotTimeAndID()
if err != nil {
namespace := mostRecentSnapshot.ID.Namespace
if namespace == nil {
namespace = ident.StringID("<nil>")
}
s.log.
With(
zap.Stringer("namespace", mostRecentSnapshot.ID.Namespace),
zap.Stringer("namespace", namespace),
zap.Time("blockStart", mostRecentSnapshot.ID.BlockStart),
zap.Uint32("shard", mostRecentSnapshot.ID.Shard),
zap.Int("index", mostRecentSnapshot.ID.VolumeIndex),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ func TestItMergesSnapshotsAndCommitLogs(t *testing.T) {
VolumeIndex: 0,
},
// Make sure path passes the "is snapshot" check in SnapshotTimeAndID method.
AbsoluteFilepaths: []string{"snapshots/checkpoint"},
CachedSnapshotTime: start.Add(time.Minute),
AbsoluteFilepaths: []string{"snapshots/checkpoint"},
CachedHasCompleteCheckpointFile: fs.EvalTrue,
CachedSnapshotTime: start.Add(time.Minute),
},
}, nil
}
Expand Down
Loading

0 comments on commit 7dad7ee

Please sign in to comment.