Skip to content

Commit

Permalink
Use persist.FileSetFlushType instead of `persist.FileSetSnapshotTyp…
Browse files Browse the repository at this point in the history
…e` for second target data and index ranges if bootstrapping namespace is read only. Because of this change, bootstrappers won't keep second range blocks in memory for read only namespaces (`shouldPersist` will evaluate to true for them).
  • Loading branch information
Linas Naginionis committed Nov 11, 2021
1 parent 1b94908 commit d8f27d0
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 29 deletions.
1 change: 1 addition & 0 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (m *bootstrapManager) bootstrap() error {
Shards: bootstrapShards,
Hooks: hooks,
DataAccumulator: accumulator,
ReadOnly: ns.namespace.ReadOnly(),
})
}

Expand Down
66 changes: 41 additions & 25 deletions src/dbnode/storage/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,24 @@ func (b bootstrapProcess) Run(
}
namespaceDetails := make([]NamespaceDetails, 0, len(namespaces))
for _, namespace := range namespaces {
ropts := namespace.Metadata.Options().RetentionOptions()
idxopts := namespace.Metadata.Options().IndexOptions()
dataRanges := b.targetRangesForData(at, ropts)
indexRanges := b.targetRangesForIndex(at, ropts, idxopts)
firstRanges := b.newShardTimeRanges(
dataRanges.firstRangeWithPersistTrue.Range,
namespace.Shards,
var (
ropts = namespace.Metadata.Options().RetentionOptions()
idxopts = namespace.Metadata.Options().IndexOptions()
readOnly = namespace.ReadOnly
dataRanges = b.targetRangesForData(at, ropts, readOnly)
indexRanges = b.targetRangesForIndex(at, ropts, idxopts, readOnly)
firstRanges = b.newShardTimeRanges(
dataRanges.firstRangeWithPersistTrue.Range,
namespace.Shards,
)
)

namespacesRunFirst.Namespaces.Set(namespace.Metadata.ID(), Namespace{
Metadata: namespace.Metadata,
Shards: namespace.Shards,
DataAccumulator: namespace.DataAccumulator,
Hooks: namespace.Hooks,
ReadOnly: namespace.ReadOnly,
DataTargetRange: dataRanges.firstRangeWithPersistTrue,
IndexTargetRange: indexRanges.firstRangeWithPersistTrue,
DataRunOptions: NamespaceRunOptions{
Expand All @@ -215,23 +220,24 @@ func (b bootstrapProcess) Run(
},
})
secondRanges := b.newShardTimeRanges(
dataRanges.secondRangeWithPersistFalse.Range, namespace.Shards)
dataRanges.secondRange.Range, namespace.Shards)
namespacesRunSecond.Namespaces.Set(namespace.Metadata.ID(), Namespace{
Metadata: namespace.Metadata,
Shards: namespace.Shards,
DataAccumulator: namespace.DataAccumulator,
Hooks: namespace.Hooks,
DataTargetRange: dataRanges.secondRangeWithPersistFalse,
IndexTargetRange: indexRanges.secondRangeWithPersistFalse,
ReadOnly: namespace.ReadOnly,
DataTargetRange: dataRanges.secondRange,
IndexTargetRange: indexRanges.secondRange,
DataRunOptions: NamespaceRunOptions{
ShardTimeRanges: secondRanges.Copy(),
TargetShardTimeRanges: secondRanges.Copy(),
RunOptions: dataRanges.secondRangeWithPersistFalse.RunOptions,
RunOptions: dataRanges.secondRange.RunOptions,
},
IndexRunOptions: NamespaceRunOptions{
ShardTimeRanges: secondRanges.Copy(),
TargetShardTimeRanges: secondRanges.Copy(),
RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions,
RunOptions: indexRanges.secondRange.RunOptions,
},
})
namespaceDetails = append(namespaceDetails, NamespaceDetails{
Expand All @@ -248,7 +254,7 @@ func (b bootstrapProcess) Run(
}

bootstrapResult := NewNamespaceResults(namespacesRunFirst)
for _, namespaces := range []Namespaces{
for runIndex, namespaces := range []Namespaces{
namespacesRunFirst,
namespacesRunSecond,
} {
Expand All @@ -266,24 +272,23 @@ func (b bootstrapProcess) Run(
continue
}

// Check if snapshot-type ranges have advanced while bootstrapping previous ranges.
// If second run, check if snapshot-type ranges have advanced while bootstrapping previous ranges.
// If yes, return an error to force a retry
if persistConf := ns.DataRunOptions.RunOptions.PersistConfig(); persistConf.Enabled &&
persistConf.FileSetType == persist.FileSetSnapshotType {
if runIndex == 1 {
var (
now = xtime.ToUnixNano(b.nowFn())
nsOptions = ns.Metadata.Options()
upToDateDataRanges = b.targetRangesForData(now, nsOptions.RetentionOptions())
upToDateDataRanges = b.targetRangesForData(now, nsOptions.RetentionOptions(), ns.ReadOnly)
)
// Only checking data ranges. Since index blocks can only be a multiple of
// data block size, the ranges for index could advance only if data ranges
// have advanced, too (while opposite is not necessarily true)
if !upToDateDataRanges.secondRangeWithPersistFalse.Range.Equal(ns.DataTargetRange.Range) {
if !upToDateDataRanges.secondRange.Range.Equal(ns.DataTargetRange.Range) {
upToDateIndexRanges := b.targetRangesForIndex(now, nsOptions.RetentionOptions(),
nsOptions.IndexOptions())
nsOptions.IndexOptions(), ns.ReadOnly)
fields := b.logFields(ns.Metadata, ns.Shards,
upToDateDataRanges.secondRangeWithPersistFalse.Range,
upToDateIndexRanges.secondRangeWithPersistFalse.Range)
upToDateDataRanges.secondRange.Range,
upToDateIndexRanges.secondRange.Range)
b.log.Error("time ranges of snapshot-type blocks advanced", fields...)
return NamespaceResults{}, ErrFileSetSnapshotTypeRangeAdvanced
}
Expand Down Expand Up @@ -445,27 +450,31 @@ func (b bootstrapProcess) logBootstrapResult(
func (b bootstrapProcess) targetRangesForData(
at xtime.UnixNano,
ropts retention.Options,
readOnly bool,
) targetRangesResult {
return b.targetRanges(at, targetRangesOptions{
retentionPeriod: ropts.RetentionPeriod(),
futureRetentionPeriod: ropts.FutureRetentionPeriod(),
blockSize: ropts.BlockSize(),
bufferPast: ropts.BufferPast(),
bufferFuture: ropts.BufferFuture(),
readOnly: readOnly,
})
}

func (b bootstrapProcess) targetRangesForIndex(
at xtime.UnixNano,
ropts retention.Options,
idxopts namespace.IndexOptions,
readOnly bool,
) targetRangesResult {
return b.targetRanges(at, targetRangesOptions{
retentionPeriod: ropts.RetentionPeriod(),
futureRetentionPeriod: ropts.FutureRetentionPeriod(),
blockSize: idxopts.BlockSize(),
bufferPast: ropts.BufferPast(),
bufferFuture: ropts.BufferFuture(),
readOnly: readOnly,
})
}

Expand All @@ -475,11 +484,12 @@ type targetRangesOptions struct {
blockSize time.Duration
bufferPast time.Duration
bufferFuture time.Duration
readOnly bool
}

type targetRangesResult struct {
firstRangeWithPersistTrue TargetRange
secondRangeWithPersistFalse TargetRange
firstRangeWithPersistTrue TargetRange
secondRange TargetRange
}

func (b bootstrapProcess) targetRanges(
Expand All @@ -499,6 +509,12 @@ func (b bootstrapProcess) targetRanges(
Truncate(opts.blockSize).
Add(opts.blockSize)

secondRangeFilesetType := persist.FileSetSnapshotType
if opts.readOnly {
// NB: If namespace is read-only, we don't want to keep blocks in memory.
secondRangeFilesetType = persist.FileSetFlushType
}

// NB(r): We want the large initial time range bootstrapped to
// bootstrap with persistence so we don't keep the full raw
// data in process until we finish bootstrapping which could
Expand All @@ -514,15 +530,15 @@ func (b bootstrapProcess) targetRanges(
FileSetType: persist.FileSetFlushType,
}),
},
secondRangeWithPersistFalse: TargetRange{
secondRange: TargetRange{
Range: xtime.Range{Start: midPoint, End: cutover},
RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{
Enabled: true,
// These blocks are still active so we'll have to keep them
// in memory, but we want to snapshot them as we receive them
// so that once bootstrapping completes we can still recover
// from just the commit log bootstrapper.
FileSetType: persist.FileSetSnapshotType,
FileSetType: secondRangeFilesetType,
}),
},
}
Expand Down
31 changes: 31 additions & 0 deletions src/dbnode/storage/bootstrap/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/sharding"
Expand Down Expand Up @@ -135,3 +136,33 @@ func TestBootstrapProcessRunActiveBlockAdvanced(t *testing.T) {
})
}
}

func TestTargetRangesFileSetTypeForReadOnlyNamespace(t *testing.T) {
sut := bootstrapProcess{processOpts: NewProcessOptions()}

rangesForData := sut.targetRangesForData(xtime.Now(), retention.NewOptions(), true)
rangesForIndex := sut.targetRangesForIndex(xtime.Now(), retention.NewOptions(), namespace.NewIndexOptions(), true)

requireFilesetTypes(t, rangesForData, persist.FileSetFlushType)
requireFilesetTypes(t, rangesForIndex, persist.FileSetFlushType)
}

func TestTargetRangesFileSetTypeForNonReadOnlyNamespace(t *testing.T) {
sut := bootstrapProcess{processOpts: NewProcessOptions()}

rangesForData := sut.targetRangesForData(xtime.Now(), retention.NewOptions(), false)
rangesForIndex := sut.targetRangesForIndex(xtime.Now(), retention.NewOptions(), namespace.NewIndexOptions(), false)

requireFilesetTypes(t, rangesForData, persist.FileSetSnapshotType)
requireFilesetTypes(t, rangesForIndex, persist.FileSetSnapshotType)
}

func requireFilesetTypes(t *testing.T, ranges targetRangesResult, expectedSecond persist.FileSetType) {
persistConfigFirstRange := ranges.firstRangeWithPersistTrue.RunOptions.PersistConfig()
require.True(t, persistConfigFirstRange.Enabled)
require.Equal(t, persist.FileSetFlushType, persistConfigFirstRange.FileSetType)

persistConfigSecondRange := ranges.secondRange.RunOptions.PersistConfig()
require.True(t, persistConfigSecondRange.Enabled)
require.Equal(t, expectedSecond, persistConfigSecondRange.FileSetType)
}
4 changes: 4 additions & 0 deletions src/dbnode/storage/bootstrap/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type ProcessNamespace struct {
DataAccumulator NamespaceDataAccumulator
// Hooks is a set of namespace bootstrap hooks.
Hooks NamespaceHooks
// ReadOnly returns true if namespace is read-only.
ReadOnly bool
}

// NamespaceHooks is a set of namespace bootstrap hooks.
Expand Down Expand Up @@ -221,6 +223,8 @@ type Namespace struct {
// IndexRunOptions are the options for the index bootstrap for this
// namespace.
IndexRunOptions NamespaceRunOptions
// ReadOnly returns true if namespace is read-only.
ReadOnly bool
}

// NamespaceRunOptions are the run options for a bootstrap process run.
Expand Down
9 changes: 5 additions & 4 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func testDatabaseBootstrapWithBootstrapError(t *testing.T, async bool) {
gomock.InOrder(
ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil),
ns.EXPECT().Metadata().Return(meta),
ns.EXPECT().ReadOnly().Return(false),
ns.EXPECT().ID().Return(id),
ns.EXPECT().
Bootstrap(gomock.Any(), gomock.Any()).
Expand Down Expand Up @@ -141,9 +142,9 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).AnyTimes()
ns.EXPECT().Metadata().Return(meta).AnyTimes()

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).Times(2)
ns.EXPECT().Metadata().Return(meta).Times(2)
ns.EXPECT().ReadOnly().Return(true).Times(2)
ns.EXPECT().
Bootstrap(gomock.Any(), gomock.Any()).
Return(nil).
Expand Down Expand Up @@ -216,7 +217,7 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) {

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return(shards, nil).AnyTimes()
ns.EXPECT().Metadata().Return(meta).AnyTimes()

ns.EXPECT().ReadOnly().Return(false).Times(2)
ns.EXPECT().
Bootstrap(gomock.Any(), gomock.Any()).
Return(nil).
Expand Down

0 comments on commit d8f27d0

Please sign in to comment.