Skip to content

Commit

Permalink
[dbnode] Add claims for index segments volume index
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Nov 6, 2020
1 parent 956db70 commit a457326
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 9 deletions.
32 changes: 31 additions & 1 deletion src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,32 @@ type BootstrapConfiguration struct {
// CacheSeriesMetadata determines whether individual bootstrappers cache
// series metadata across all calls (namespaces / shards / blocks).
CacheSeriesMetadata *bool `yaml:"cacheSeriesMetadata"`

// Verify specifies verification checks.
Verify *BootstrapVerifyConfiguration `yaml:"verify"`
}

// VerifyOrDefault returns verify configuration or default.
func (bsc BootstrapConfiguration) VerifyOrDefault() BootstrapVerifyConfiguration {
if bsc.Verify == nil {
return BootstrapVerifyConfiguration{}
}
return *bsc.Verify
}

// BootstrapVerifyConfiguration outlines verification checks to enable
// during a bootstrap.
type BootstrapVerifyConfiguration struct {
VerifyIndexSegments *bool `yaml:"verifyIndexSegments"`
}

// VerifyIndexSegmentsOrDefault returns whether to verify index segments
// or use default value.
func (c BootstrapVerifyConfiguration) VerifyIndexSegmentsOrDefault() bool {
if c.VerifyIndexSegments == nil {
return false
}
return *c.VerifyIndexSegments
}

// BootstrapFilesystemConfiguration specifies config for the fs bootstrapper.
Expand Down Expand Up @@ -218,10 +244,14 @@ func (bsc BootstrapConfiguration) New(
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetIdentifierPool(opts.IdentifierPool()).
SetMigrationOptions(fsCfg.migration().NewOptions()).
SetStorageOptions(opts)
SetStorageOptions(opts).
SetIndexSegmentsVerify(bsc.VerifyOrDefault().VerifyIndexSegmentsOrDefault())
if err := validator.ValidateFilesystemBootstrapperOptions(fsbOpts); err != nil {
return nil, err
}
if err := fsbOpts.Validate(); err != nil {
return nil, err
}
bs, err = bfs.NewFileSystemBootstrapperProvider(fsbOpts, bs)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func TestConfiguration(t *testing.T) {
returnUnfulfilledForCorruptCommitLogFiles: false
peers: null
cacheSeriesMetadata: null
verify: null
blockRetrieve: null
cache:
series: null
Expand Down
13 changes: 7 additions & 6 deletions src/dbnode/persist/fs/index_read_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ func ReadIndexSegments(
success = false
)

if validate {
if err = reader.Validate(); err != nil {
return ReadIndexSegmentsResult{}, err
}
}

// Need to do this to guarantee we release all resources in case of failure.
defer func() {
if !success {
Expand Down Expand Up @@ -123,6 +117,13 @@ func ReadIndexSegments(
segments = append(segments, seg)
}

// Note: need to validate after all segment file sets read.
if validate {
if err = reader.Validate(); err != nil {
return ReadIndexSegmentsResult{}, err
}
}

// Indicate we don't need the defer() above to release any resources, as we are
// transferring ownership to the caller.
success = true
Expand Down
72 changes: 71 additions & 1 deletion src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/ratelimit"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/m3ninx/index/segment"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/m3db/m3/src/x/checked"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -260,7 +263,7 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis
// to uniquely identify a single FileSetFile on disk.

// work out the volume index for the next Index FileSetFile for the given namespace/blockstart
volumeIndex, err := NextIndexFileSetVolumeIndex(pm.opts.FilePathPrefix(), nsMetadata.ID(), blockStart)
volumeIndex, err := claimNextIndexFileSetVolumeIndex(pm.opts, nsMetadata, blockStart)
if err != nil {
return prepared, err
}
Expand Down Expand Up @@ -654,3 +657,70 @@ func (pm *persistManager) SetRuntimeOptions(value runtime.Options) {
pm.currRateLimitOpts = value.PersistRateLimitOptions()
pm.Unlock()
}

var (
indexVolumeIndexClaimsLock sync.Mutex
indexVolumeIndexClaims = make(map[string]indexVolumeIndexClaim)
)

type indexVolumeIndexClaim struct {
volumeIndex int
blockStart xtime.UnixNano
}

// claimNextIndexFileSetVolumeIndex ensures we never use the same volume index
// no matter what for an index volume from the same process, we always just
// select the next integer regardless if previous attempt to write to prior
// index failed or succeeded. This is ok since volume indexes do not need
// to be contiguous, they just need to be ascending.
func claimNextIndexFileSetVolumeIndex(
opts Options,
namespaceMetadata namespace.Metadata,
blockStartTime time.Time,
) (int, error) {
indexVolumeIndexClaimsLock.Lock()
defer indexVolumeIndexClaimsLock.Unlock()

filePathPrefix := opts.FilePathPrefix()
nowFn := opts.ClockOptions().NowFn()

// Reap any old entries that are out of retention.
retOpts := namespaceMetadata.Options().RetentionOptions()
indexOpts := namespaceMetadata.Options().IndexOptions()
rp, bs, t := retOpts.RetentionPeriod(), indexOpts.BlockSize(), nowFn()
earliestBlockStart := retention.FlushTimeStartForRetentionPeriod(rp, bs, t)
earliestBlockStartUnixNanos := xtime.ToUnixNano(earliestBlockStart)
for key, claim := range indexVolumeIndexClaims {
if claim.blockStart.Before(earliestBlockStartUnixNanos) {
delete(indexVolumeIndexClaims, key)
}
}

// Now check if previous claim exists.
blockStart := xtime.ToUnixNano(blockStartTime)
namespace := namespaceMetadata.ID()
key := fmt.Sprintf("%s/%s/%d", filePathPrefix, namespace.String(),
blockStart)

if curr, ok := indexVolumeIndexClaims[key]; ok {
// Already had a previous claim, return the next claim.
next := curr
next.volumeIndex++
indexVolumeIndexClaims[key] = next
return next.volumeIndex, nil
}

volumeIndex, err := NextIndexFileSetVolumeIndex(filePathPrefix, namespace,
blockStart.ToTime())
if err != nil {
return 0, err
}

// Set this as claimed (always return +1 for next attempt) and
// return.
indexVolumeIndexClaims[key] = indexVolumeIndexClaim{
volumeIndex: volumeIndex,
blockStart: blockStart,
}
return volumeIndex, nil
}
15 changes: 15 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ var (
// us splitting an index block into smaller pieces is moot because we'll
// pull a lot more data into memory if we create more than one at a time.
defaultBootstrapIndexNumProcessors = 1

// defaultIndexSegmentsVerify defines default for index segments validation.
defaultIndexSegmentsVerify = false
)

type options struct {
Expand All @@ -65,6 +68,7 @@ type options struct {
indexOpts index.Options
persistManager persist.Manager
compactor *compaction.Compactor
indexSegmentsVerify bool
bootstrapDataNumProcessors int
bootstrapIndexNumProcessors int
runtimeOptsMgr runtime.OptionsManager
Expand All @@ -84,6 +88,7 @@ func NewOptions() Options {
return &options{
instrumentOpts: instrument.NewOptions(),
resultOpts: result.NewOptions(),
indexSegmentsVerify: defaultIndexSegmentsVerify,
bootstrapDataNumProcessors: defaultBootstrapDataNumProcessors,
bootstrapIndexNumProcessors: defaultBootstrapIndexNumProcessors,
runtimeOptsMgr: runtime.NewOptionsManager(),
Expand Down Expand Up @@ -195,6 +200,16 @@ func (o *options) BoostrapIndexNumProcessors() int {
return o.bootstrapIndexNumProcessors
}

func (o *options) SetIndexSegmentsVerify(value bool) Options {
opts := *o
opts.indexSegmentsVerify = value
return &opts
}

func (o *options) IndexSegmentsVerify() bool {
return o.indexSegmentsVerify
}

func (o *options) SetRuntimeOptionsManager(value runtime.OptionsManager) Options {
opts := *o
opts.runtimeOptsMgr = value
Expand Down
19 changes: 18 additions & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,14 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
}

if shouldFlush && satisifiedFlushRanges {
// Use debug level with full log fidelity.
s.log.Debug("building file set index segment", buildIndexLogFields...)
// Use info log with more high level attributes.
s.log.Info("rebuilding file set index segment",
zap.Stringer("namespace", ns.ID()),
zap.Int("totalEntries", totalEntries),
zap.Time("blockStart", blockStart),
zap.Time("blockEnd", blockEnd))
indexBlock, err = bootstrapper.PersistBootstrapIndexSegment(
ns,
requestedRanges,
Expand Down Expand Up @@ -932,12 +939,22 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
continue
}

fsOpts := s.fsopts
verify := s.opts.IndexSegmentsVerify()
if verify {
// Make sure for this call to read index segments
// to validate the index segment.
// If fails validation will rebuild since missing from
// fulfilled range.
fsOpts = fsOpts.SetIndexReaderAutovalidateIndexSegments(true)
}

readResult, err := fs.ReadIndexSegments(fs.ReadIndexSegmentsOptions{
ReaderOptions: fs.IndexReaderOpenOptions{
Identifier: infoFile.ID,
FileSetType: persist.FileSetFlushType,
},
FilesystemOptions: s.fsopts,
FilesystemOptions: fsOpts,
})
if err != nil {
s.log.Error("unable to read segments from index fileset",
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ type Options interface {
// work for bootstrapping data file sets.
BoostrapIndexNumProcessors() int

// SetIndexSegmentsVerify sets the value for whether to verify bootstrapped
// index segments.
SetIndexSegmentsVerify(value bool) Options

// IndexSegmentsVerify returns the value for whether to verify bootstrapped
// index segments.
IndexSegmentsVerify() bool

// SetRuntimeOptionsManager sets the runtime options manager.
SetRuntimeOptionsManager(value runtime.OptionsManager) Options

Expand Down

0 comments on commit a457326

Please sign in to comment.