Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Avoid loading blocks in memory for namespaces with snapshots disabled during bootstrapping #3919

Merged
merged 7 commits into from
Nov 17, 2021
83 changes: 46 additions & 37 deletions src/dbnode/storage/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/tracepoint"
Expand Down Expand Up @@ -188,14 +187,16 @@ func (b bootstrapProcess) Run(
}
namespaceDetails := make([]NamespaceDetails, 0, len(namespaces))
for _, namespace := range namespaces {
ropts := namespace.Metadata.Options().RetentionOptions()
idxopts := namespace.Metadata.Options().IndexOptions()
dataRanges := b.targetRangesForData(at, ropts)
indexRanges := b.targetRangesForIndex(at, ropts, idxopts)
firstRanges := b.newShardTimeRanges(
dataRanges.firstRangeWithPersistTrue.Range,
namespace.Shards,
var (
nsOpts = namespace.Metadata.Options()
dataRanges = b.targetRangesForData(at, nsOpts)
indexRanges = b.targetRangesForIndex(at, nsOpts)
firstRanges = b.newShardTimeRanges(
dataRanges.firstRangeWithPersistTrue.Range,
namespace.Shards,
)
)

namespacesRunFirst.Namespaces.Set(namespace.Metadata.ID(), Namespace{
Metadata: namespace.Metadata,
Shards: namespace.Shards,
Expand All @@ -215,23 +216,23 @@ func (b bootstrapProcess) Run(
},
})
secondRanges := b.newShardTimeRanges(
dataRanges.secondRangeWithPersistFalse.Range, namespace.Shards)
dataRanges.secondRange.Range, namespace.Shards)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm why are we using “secondRange” instead of “secondRangeWithPersistFalse” here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if namespace is read-only, it will resolve to shouldPersist=true for the second range. Other namespaces will still resolve to shouldPersist=false as it was before these changes.

namespacesRunSecond.Namespaces.Set(namespace.Metadata.ID(), Namespace{
Metadata: namespace.Metadata,
Shards: namespace.Shards,
DataAccumulator: namespace.DataAccumulator,
Hooks: namespace.Hooks,
DataTargetRange: dataRanges.secondRangeWithPersistFalse,
IndexTargetRange: indexRanges.secondRangeWithPersistFalse,
DataTargetRange: dataRanges.secondRange,
IndexTargetRange: indexRanges.secondRange,
DataRunOptions: NamespaceRunOptions{
ShardTimeRanges: secondRanges.Copy(),
TargetShardTimeRanges: secondRanges.Copy(),
RunOptions: dataRanges.secondRangeWithPersistFalse.RunOptions,
RunOptions: dataRanges.secondRange.RunOptions,
},
IndexRunOptions: NamespaceRunOptions{
ShardTimeRanges: secondRanges.Copy(),
TargetShardTimeRanges: secondRanges.Copy(),
RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions,
RunOptions: indexRanges.secondRange.RunOptions,
},
})
namespaceDetails = append(namespaceDetails, NamespaceDetails{
Expand All @@ -247,12 +248,12 @@ func (b bootstrapProcess) Run(
return NamespaceResults{}, err
}

bootstrapResult := NewNamespaceResults(namespacesRunFirst)
for _, namespaces := range []Namespaces{
namespacesRunFirst,
namespacesRunSecond,
} {

var (
bootstrapResult = NewNamespaceResults(namespacesRunFirst)
namespacesToRun = []Namespaces{namespacesRunFirst, namespacesRunSecond}
lastRunIndex = len(namespacesToRun) - 1
)
for runIndex, namespaces := range namespacesToRun {
for _, entry := range namespaces.Namespaces.Iter() {
ns := entry.Value()

Expand All @@ -266,24 +267,22 @@ func (b bootstrapProcess) Run(
continue
}

// Check if snapshot-type ranges have advanced while bootstrapping previous ranges.
// If yes, return an error to force a retry
if persistConf := ns.DataRunOptions.RunOptions.PersistConfig(); persistConf.Enabled &&
persistConf.FileSetType == persist.FileSetSnapshotType {
// If last run, check if ranges have advanced while bootstrapping previous ranges.
// If yes, return an error to force a retry.
if runIndex == lastRunIndex {
var (
now = xtime.ToUnixNano(b.nowFn())
nsOptions = ns.Metadata.Options()
upToDateDataRanges = b.targetRangesForData(now, nsOptions.RetentionOptions())
upToDateDataRanges = b.targetRangesForData(now, nsOptions)
)
// Only checking data ranges. Since index blocks can only be a multiple of
// data block size, the ranges for index could advance only if data ranges
// have advanced, too (while opposite is not necessarily true)
if !upToDateDataRanges.secondRangeWithPersistFalse.Range.Equal(ns.DataTargetRange.Range) {
upToDateIndexRanges := b.targetRangesForIndex(now, nsOptions.RetentionOptions(),
nsOptions.IndexOptions())
if !upToDateDataRanges.secondRange.Range.Equal(ns.DataTargetRange.Range) {
upToDateIndexRanges := b.targetRangesForIndex(now, nsOptions)
fields := b.logFields(ns.Metadata, ns.Shards,
upToDateDataRanges.secondRangeWithPersistFalse.Range,
upToDateIndexRanges.secondRangeWithPersistFalse.Range)
upToDateDataRanges.secondRange.Range,
upToDateIndexRanges.secondRange.Range)
b.log.Error("time ranges of snapshot-type blocks advanced", fields...)
return NamespaceResults{}, ErrFileSetSnapshotTypeRangeAdvanced
}
Expand Down Expand Up @@ -444,28 +443,31 @@ func (b bootstrapProcess) logBootstrapResult(

func (b bootstrapProcess) targetRangesForData(
at xtime.UnixNano,
ropts retention.Options,
nsOpts namespace.Options,
) targetRangesResult {
ropts := nsOpts.RetentionOptions()
return b.targetRanges(at, targetRangesOptions{
retentionPeriod: ropts.RetentionPeriod(),
futureRetentionPeriod: ropts.FutureRetentionPeriod(),
blockSize: ropts.BlockSize(),
bufferPast: ropts.BufferPast(),
bufferFuture: ropts.BufferFuture(),
snapshotEnabled: nsOpts.SnapshotEnabled(),
})
}

func (b bootstrapProcess) targetRangesForIndex(
at xtime.UnixNano,
ropts retention.Options,
idxopts namespace.IndexOptions,
nsOpts namespace.Options,
) targetRangesResult {
ropts := nsOpts.RetentionOptions()
return b.targetRanges(at, targetRangesOptions{
retentionPeriod: ropts.RetentionPeriod(),
futureRetentionPeriod: ropts.FutureRetentionPeriod(),
blockSize: idxopts.BlockSize(),
blockSize: nsOpts.IndexOptions().BlockSize(),
bufferPast: ropts.BufferPast(),
bufferFuture: ropts.BufferFuture(),
snapshotEnabled: nsOpts.SnapshotEnabled(),
})
}

Expand All @@ -475,11 +477,12 @@ type targetRangesOptions struct {
blockSize time.Duration
bufferPast time.Duration
bufferFuture time.Duration
snapshotEnabled bool
}

type targetRangesResult struct {
firstRangeWithPersistTrue TargetRange
secondRangeWithPersistFalse TargetRange
firstRangeWithPersistTrue TargetRange
secondRange TargetRange
}

func (b bootstrapProcess) targetRanges(
Expand All @@ -499,6 +502,12 @@ func (b bootstrapProcess) targetRanges(
Truncate(opts.blockSize).
Add(opts.blockSize)

secondRangeFilesetType := persist.FileSetSnapshotType
if !opts.snapshotEnabled {
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
// NB: If snapshots are disabled for a namespace, we want to use flush type.
secondRangeFilesetType = persist.FileSetFlushType
}

// NB(r): We want the large initial time range bootstrapped to
// bootstrap with persistence so we don't keep the full raw
// data in process until we finish bootstrapping which could
Expand All @@ -514,15 +523,15 @@ func (b bootstrapProcess) targetRanges(
FileSetType: persist.FileSetFlushType,
}),
},
secondRangeWithPersistFalse: TargetRange{
secondRange: TargetRange{
Range: xtime.Range{Start: midPoint, End: cutover},
RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{
Enabled: true,
// These blocks are still active so we'll have to keep them
// in memory, but we want to snapshot them as we receive them
// so that once bootstrapping completes we can still recover
// from just the commit log bootstrapper.
FileSetType: persist.FileSetSnapshotType,
FileSetType: secondRangeFilesetType,
}),
},
}
Expand Down
33 changes: 33 additions & 0 deletions src/dbnode/storage/bootstrap/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/sharding"
Expand Down Expand Up @@ -135,3 +136,35 @@ func TestBootstrapProcessRunActiveBlockAdvanced(t *testing.T) {
})
}
}

func TestTargetRangesFileSetTypeForReadOnlyNamespace(t *testing.T) {
sut := bootstrapProcess{processOpts: NewProcessOptions()}
nsOpts := namespace.NewOptions().SetSnapshotEnabled(false)

rangesForData := sut.targetRangesForData(xtime.Now(), nsOpts)
rangesForIndex := sut.targetRangesForIndex(xtime.Now(), nsOpts)

requireFilesetTypes(t, rangesForData, persist.FileSetFlushType)
requireFilesetTypes(t, rangesForIndex, persist.FileSetFlushType)
}

func TestTargetRangesFileSetTypeForNonReadOnlyNamespace(t *testing.T) {
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
sut := bootstrapProcess{processOpts: NewProcessOptions()}
nsOpts := namespace.NewOptions().SetSnapshotEnabled(true)

rangesForData := sut.targetRangesForData(xtime.Now(), nsOpts)
rangesForIndex := sut.targetRangesForIndex(xtime.Now(), nsOpts)

requireFilesetTypes(t, rangesForData, persist.FileSetSnapshotType)
requireFilesetTypes(t, rangesForIndex, persist.FileSetSnapshotType)
}

func requireFilesetTypes(t *testing.T, ranges targetRangesResult, expectedSecond persist.FileSetType) {
persistConfigFirstRange := ranges.firstRangeWithPersistTrue.RunOptions.PersistConfig()
require.True(t, persistConfigFirstRange.Enabled)
require.Equal(t, persist.FileSetFlushType, persistConfigFirstRange.FileSetType)

persistConfigSecondRange := ranges.secondRange.RunOptions.PersistConfig()
require.True(t, persistConfigSecondRange.Enabled)
require.Equal(t, expectedSecond, persistConfigSecondRange.FileSetType)
}
15 changes: 6 additions & 9 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
xtest "github.com/m3db/m3/src/x/test"
)

Expand Down Expand Up @@ -141,9 +140,8 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).AnyTimes()
ns.EXPECT().Metadata().Return(meta).AnyTimes()

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil).Times(2)
ns.EXPECT().Metadata().Return(meta).Times(2)
ns.EXPECT().
Bootstrap(gomock.Any(), gomock.Any()).
Return(nil).
Expand Down Expand Up @@ -216,7 +214,6 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) {

ns.EXPECT().PrepareBootstrap(gomock.Any()).Return(shards, nil).AnyTimes()
ns.EXPECT().Metadata().Return(meta).AnyTimes()

ns.EXPECT().
Bootstrap(gomock.Any(), gomock.Any()).
Return(nil).
Expand Down