Skip to content

Commit

Permalink
[dbnode] Properly rebuild index segments if they fail verification. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored and robskillington committed Nov 19, 2020
1 parent a457326 commit 55b6f56
Show file tree
Hide file tree
Showing 13 changed files with 538 additions and 108 deletions.
34 changes: 20 additions & 14 deletions src/dbnode/persist/fs/index_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package fs

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -41,6 +42,9 @@ const (
mmapPersistFsIndexName = "mmap.persist.fs.index"
)

// ErrIndexReaderValidationFailed is returned for corrupt index segemnts.
var ErrIndexReaderValidationFailed = errors.New("validation failed")

type indexReader struct {
opts Options
filePathPrefix string
Expand Down Expand Up @@ -305,52 +309,54 @@ func (r *indexReader) Validate() error {

func (r *indexReader) validateDigestsFileDigest() error {
if r.readDigests.digestsFileDigest != r.expectedDigestOfDigest {
return fmt.Errorf("read digests file checksum bad: expected=%d, actual=%d",
r.expectedDigestOfDigest, r.readDigests.digestsFileDigest)
return fmt.Errorf("(%w) read digests file checksum bad: expected=%d, actual=%d",
ErrIndexReaderValidationFailed, r.expectedDigestOfDigest, r.readDigests.digestsFileDigest)
}
return nil
}

func (r *indexReader) validateInfoFileDigest() error {
if r.readDigests.infoFileDigest != r.expectedDigest.InfoDigest {
return fmt.Errorf("read info file checksum bad: expected=%d, actual=%d",
r.expectedDigest.InfoDigest, r.readDigests.infoFileDigest)
return fmt.Errorf("(%w) read info file checksum bad: expected=%d, actual=%d",
ErrIndexReaderValidationFailed, r.expectedDigest.InfoDigest, r.readDigests.infoFileDigest)
}
return nil
}

func (r *indexReader) validateSegmentFileDigest(segmentIdx, fileIdx int) error {
if segmentIdx >= len(r.readDigests.segments) {
return fmt.Errorf(
"have not read correct number of segments to validate segment %d checksums: "+
"(%w) have not read correct number of segments to validate segment %d checksums: "+
"need=%d, actual=%d",
segmentIdx, segmentIdx+1, len(r.readDigests.segments))
ErrIndexReaderValidationFailed, segmentIdx, segmentIdx+1, len(r.readDigests.segments))
}
if segmentIdx >= len(r.expectedDigest.SegmentDigests) {
return fmt.Errorf(
"have not read digest files correctly to validate segment %d checksums: "+
"(%w) have not read digest files correctly to validate segment %d checksums: "+
"need=%d, actual=%d",
segmentIdx, segmentIdx+1, len(r.expectedDigest.SegmentDigests))
ErrIndexReaderValidationFailed, segmentIdx, segmentIdx+1, len(r.expectedDigest.SegmentDigests))
}

if fileIdx >= len(r.readDigests.segments[segmentIdx].files) {
return fmt.Errorf(
"have not read correct number of segment files to validate segment %d checksums: "+
"(%w) have not read correct number of segment files to validate segment %d checksums: "+
"need=%d, actual=%d",
segmentIdx, fileIdx+1, len(r.readDigests.segments[segmentIdx].files))
ErrIndexReaderValidationFailed, segmentIdx, fileIdx+1,
len(r.readDigests.segments[segmentIdx].files))
}
if fileIdx >= len(r.expectedDigest.SegmentDigests[segmentIdx].Files) {
return fmt.Errorf(
"have not read correct number of segment files to validate segment %d checksums: "+
"(%w) have not read correct number of segment files to validate segment %d checksums: "+
"need=%d, actual=%d",
segmentIdx, fileIdx+1, len(r.expectedDigest.SegmentDigests[segmentIdx].Files))
ErrIndexReaderValidationFailed, segmentIdx, fileIdx+1,
len(r.expectedDigest.SegmentDigests[segmentIdx].Files))
}

expected := r.expectedDigest.SegmentDigests[segmentIdx].Files[fileIdx].Digest
actual := r.readDigests.segments[segmentIdx].files[fileIdx].digest
if actual != expected {
return fmt.Errorf("read segment file %d for segment %d checksum bad: expected=%d, actual=%d",
segmentIdx, fileIdx, expected, actual)
return fmt.Errorf("(%w) read segment file %d for segment %d checksum bad: expected=%d, actual=%d",
ErrIndexReaderValidationFailed, segmentIdx, fileIdx, expected, actual)
}
return nil
}
Expand Down
14 changes: 13 additions & 1 deletion src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type DataFileSetReaderStatus struct {
// DataReaderOpenOptions is options struct for the reader open method.
type DataReaderOpenOptions struct {
// Identifier allows to identify a FileSetFile.
Identifier FileSetFileIdentifier
Identifier FileSetFileIdentifier
// FileSetType is the file set type.
FileSetType persist.FileSetType
// StreamingEnabled enables using streaming methods, such as DataFileSetReader.StreamingRead.
Expand Down Expand Up @@ -680,3 +680,15 @@ type CrossBlockIterator interface {
// Reset resets the iterator to the given block records.
Reset(records []BlockRecord)
}

// IndexClaimsManager manages concurrent claims to volume indices per ns and block start.
// This allows multiple threads to safely increment the volume index.
type IndexClaimsManager interface {
ClaimNextIndexFileSetVolumeIndex(
md namespace.Metadata,
blockStart time.Time,
) (int, error)
}

// DeleteFilesFn deletes files passed in as arg.
type DeleteFilesFn func(files []string) error
15 changes: 15 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrap_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The collection of bootstrappers comprise the task executed when bootstrapping a

- `fs`: The filesystem bootstrapper, used to bootstrap as much data as possible from the local filesystem.
- `peers`: The peers bootstrapper, used to bootstrap any remaining data from peers. This is used for a full node join too.
- *NOTE*: For the node leave case, the peers bs will persist default volume type index filesets to disk with non-overlapping shard time ranges to avoid re-building the entire index segment w/ new shards.
- `commitlog`: The commit log bootstrapper, currently only used in the case that peers bootstrapping fails. Once the current block is being snapshotted frequently to disk it might be faster and make more sense to not actively use the peers bootstrapper and just use a combination of the filesystem bootstrapper and the minimal time range required from the commit log bootstrapper.
- *NOTE*: the commitlog bootstrapper is special cased in that it runs for the *entire* bootstrappable range per shard whereas other bootstrappers fill in the unfulfilled gaps as bootstrapping progresses.

Expand Down
Loading

0 comments on commit 55b6f56

Please sign in to comment.