Skip to content

Commit

Permalink
Skip bootstrapping not default index blocks missing default index vol…
Browse files Browse the repository at this point in the history
…ume. Emit a metric for index blocks failing validation.
  • Loading branch information
notbdu committed Nov 14, 2020
1 parent 30e8aeb commit 413290f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 23 deletions.
32 changes: 18 additions & 14 deletions src/dbnode/persist/fs/index_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package fs

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -41,6 +42,9 @@ const (
mmapPersistFsIndexName = "mmap.persist.fs.index"
)

// ErrIndexReaderValidationFailed is returned for corrupt index segemnts.
var ErrIndexReaderValidationFailed = errors.New("validation failed")

type indexReader struct {
opts Options
filePathPrefix string
Expand Down Expand Up @@ -305,52 +309,52 @@ func (r *indexReader) Validate() error {

func (r *indexReader) validateDigestsFileDigest() error {
if r.readDigests.digestsFileDigest != r.expectedDigestOfDigest {
return fmt.Errorf("read digests file checksum bad: expected=%d, actual=%d",
r.expectedDigestOfDigest, r.readDigests.digestsFileDigest)
return fmt.Errorf("(%w) read digests file checksum bad: expected=%d, actual=%d",
ErrIndexReaderValidationFailed, r.expectedDigestOfDigest, r.readDigests.digestsFileDigest)
}
return nil
}

func (r *indexReader) validateInfoFileDigest() error {
if r.readDigests.infoFileDigest != r.expectedDigest.InfoDigest {
return fmt.Errorf("read info file checksum bad: expected=%d, actual=%d",
r.expectedDigest.InfoDigest, r.readDigests.infoFileDigest)
return fmt.Errorf("(%w) read info file checksum bad: expected=%d, actual=%d",
ErrIndexReaderValidationFailed, r.expectedDigest.InfoDigest, r.readDigests.infoFileDigest)
}
return nil
}

func (r *indexReader) validateSegmentFileDigest(segmentIdx, fileIdx int) error {
if segmentIdx >= len(r.readDigests.segments) {
return fmt.Errorf(
"have not read correct number of segments to validate segment %d checksums: "+
"(%w) have not read correct number of segments to validate segment %d checksums: "+
"need=%d, actual=%d",
segmentIdx, segmentIdx+1, len(r.readDigests.segments))
ErrIndexReaderValidationFailed, segmentIdx, segmentIdx+1, len(r.readDigests.segments))
}
if segmentIdx >= len(r.expectedDigest.SegmentDigests) {
return fmt.Errorf(
"have not read digest files correctly to validate segment %d checksums: "+
"(%w) have not read digest files correctly to validate segment %d checksums: "+
"need=%d, actual=%d",
segmentIdx, segmentIdx+1, len(r.expectedDigest.SegmentDigests))
ErrIndexReaderValidationFailed, segmentIdx, segmentIdx+1, len(r.expectedDigest.SegmentDigests))
}

if fileIdx >= len(r.readDigests.segments[segmentIdx].files) {
return fmt.Errorf(
"have not read correct number of segment files to validate segment %d checksums: "+
"(%w) have not read correct number of segment files to validate segment %d checksums: "+
"need=%d, actual=%d",
segmentIdx, fileIdx+1, len(r.readDigests.segments[segmentIdx].files))
ErrIndexReaderValidationFailed, segmentIdx, fileIdx+1, len(r.readDigests.segments[segmentIdx].files))
}
if fileIdx >= len(r.expectedDigest.SegmentDigests[segmentIdx].Files) {
return fmt.Errorf(
"have not read correct number of segment files to validate segment %d checksums: "+
"(%w) have not read correct number of segment files to validate segment %d checksums: "+
"need=%d, actual=%d",
segmentIdx, fileIdx+1, len(r.expectedDigest.SegmentDigests[segmentIdx].Files))
ErrIndexReaderValidationFailed, segmentIdx, fileIdx+1, len(r.expectedDigest.SegmentDigests[segmentIdx].Files))
}

expected := r.expectedDigest.SegmentDigests[segmentIdx].Files[fileIdx].Digest
actual := r.readDigests.segments[segmentIdx].files[fileIdx].digest
if actual != expected {
return fmt.Errorf("read segment file %d for segment %d checksum bad: expected=%d, actual=%d",
segmentIdx, fileIdx, expected, actual)
return fmt.Errorf("(%w) read segment file %d for segment %d checksum bad: expected=%d, actual=%d",
ErrIndexReaderValidationFailed, segmentIdx, fileIdx, expected, actual)
}
return nil
}
Expand Down
70 changes: 61 additions & 9 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package fs

import (
"errors"
"fmt"
"sort"
"sync"
"time"

indexpb "github.com/m3db/m3/src/dbnode/generated/proto/index"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
Expand Down Expand Up @@ -84,8 +87,9 @@ type fileSystemSource struct {
}

type fileSystemSourceMetrics struct {
persistedIndexBlocksRead tally.Counter
persistedIndexBlocksWrite tally.Counter
persistedIndexBlocksRead tally.Counter
persistedIndexBlocksWrite tally.Counter
indexBlocksFailedValidation tally.Counter
}

func newFileSystemSource(opts Options) (bootstrap.Source, error) {
Expand All @@ -107,8 +111,9 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) {
newReaderFn: fs.NewReader,
deleteFilesFn: fs.DeleteFiles,
metrics: fileSystemSourceMetrics{
persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"),
persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"),
persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"),
persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"),
indexBlocksFailedValidation: scope.Counter("index-blocks-failed-validation"),
},
}
s.newReaderPoolOpts.Alloc = s.newReader
Expand Down Expand Up @@ -1010,6 +1015,14 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
return bootstrapFromIndexPersistedBlocksResult{}, err
}

// Sort info files by block start and index volume type (default comes first)
sort.Slice(infoFiles, func(i, j int) bool {
if infoFiles[i].Info.BlockStart != infoFiles[j].Info.BlockStart {
return infoFiles[i].Info.BlockStart < infoFiles[j].Info.BlockStart
}
return volumeTypeFromInfo(infoFiles[i].Info) == idxpersist.DefaultIndexVolumeType
})

for _, infoFile := range infoFiles {
if err := infoFile.Err.Error(); err != nil {
s.log.Error("unable to read index info file",
Expand All @@ -1022,7 +1035,25 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
}

info := infoFile.Info
indexBlockStart := xtime.UnixNano(info.BlockStart).ToTime()
indexBlockStartUnixNano := xtime.UnixNano(info.BlockStart)
indexBlockStart := indexBlockStartUnixNano.ToTime()
volumeType := volumeTypeFromInfo(info)

// NB(bodu): Skip non default index volumes for this block if we have not seen a default
// index volume already. We sort info files by block start and index volume so we should
// have seen a default volume by now if it exists and is not corrupted.
if volumeType != idxpersist.DefaultIndexVolumeType &&
indexBlockExistsInResults(res.result.index.IndexResults(), indexBlockStartUnixNano, volumeType) {
s.log.Info("skipping index fileset missing default index volume type",
zap.Stringer("namespace", ns.ID()),
zap.Stringer("blockStart", indexBlockStart),
zap.String("volumeType", string(volumeType)),
zap.Stringer("shardTimeRanges", shardTimeRanges),
zap.String("filepath", infoFile.Err.Filepath()),
)
continue
}

indexBlockRange := xtime.Range{
Start: indexBlockStart,
End: indexBlockStart.Add(indexBlockSize),
Expand Down Expand Up @@ -1078,6 +1109,10 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
zap.Time("blockStart", indexBlockStart),
zap.Int("volumeIndex", infoFile.ID.VolumeIndex),
)
// Emit a metric for failed validations.
if errors.Is(err, fs.ErrIndexReaderValidationFailed) {
s.metrics.indexBlocksFailedValidation.Inc(1)
}
continue
}

Expand All @@ -1094,10 +1129,6 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
for _, segment := range readResult.Segments {
persistedSegments = append(persistedSegments, result.NewSegment(segment, true))
}
volumeType := idxpersist.DefaultIndexVolumeType
if info.IndexVolumeType != nil {
volumeType = idxpersist.IndexVolumeType(info.IndexVolumeType.Value)
}
indexBlockByVolumeType := result.NewIndexBlockByVolumeType(indexBlockStart)
indexBlockByVolumeType.SetBlock(volumeType, result.NewIndexBlock(persistedSegments, segmentsFulfilled))
// NB(r): Don't need to call MarkFulfilled on the IndexResults here
Expand Down Expand Up @@ -1153,3 +1184,24 @@ func (r *runResult) mergedResult(other *runResult) *runResult {
index: result.MergedIndexBootstrapResult(r.index, other.index),
}
}

func volumeTypeFromInfo(info indexpb.IndexVolumeInfo) idxpersist.IndexVolumeType {
volumeType := idxpersist.DefaultIndexVolumeType
if info.IndexVolumeType != nil {
volumeType = idxpersist.IndexVolumeType(info.IndexVolumeType.Value)
}
return volumeType
}

func indexBlockExistsInResults(
results result.IndexResults,
blockStart xtime.UnixNano,
volumeType idxpersist.IndexVolumeType,
) bool {
indexBlockByVolumeType, ok := results[blockStart]
if !ok {
return false
}
_, ok = indexBlockByVolumeType.GetBlock(volumeType)
return ok
}

0 comments on commit 413290f

Please sign in to comment.