Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Cache the results of the commitlog bootstapper between runs #2635

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
"io"
"sync"
"time"
Expand Down Expand Up @@ -72,6 +73,13 @@ type commitLogSource struct {
newReaderFn newReaderFn

metrics commitLogSourceMetrics

// Only read the commit log once, since it can be expensive to read twice. Since the commit log is not sharded by
// time range we can't read just the configured time ranges for each pass. Instead we read the entire retention
//period during the first pass and skip the second pass.
//
// The bootstrapper is single threaded so we don't need a mutex for this.
alreadyRun bool
}

type bootstrapNamespace struct {
Expand Down Expand Up @@ -172,6 +180,14 @@ func (s *commitLogSource) Read(
ctx context.Context,
namespaces bootstrap.Namespaces,
) (bootstrap.NamespaceResults, error) {
// bail early if we already read the commit log in a previous pass.
if s.alreadyRun {
s.log.Debug("the entire range of the commit log has already been read. skipping this subsequent pass.")
none, _ := bootstrapper.NewNoOpNoneBootstrapperProvider().Provide()
return none.Bootstrap(ctx, namespaces)
}
s.alreadyRun = true

ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperCommitLogSourceRead)
defer span.Finish()

Expand Down Expand Up @@ -201,9 +217,9 @@ func (s *commitLogSource) Read(
shardTimeRanges := result.NewShardTimeRanges()
// NB(bodu): Use TargetShardTimeRanges which covers the entire original target shard range
// since the commitlog bootstrapper should run for the entire bootstrappable range per shard.
shardTimeRanges.AddRanges(ns.DataRunOptions.TargetShardTimeRanges)
shardTimeRanges.AddRanges(ns.DataRunOptions.AllShardTimeRanges)
if ns.Metadata.Options().IndexOptions().Enabled() {
shardTimeRanges.AddRanges(ns.IndexRunOptions.TargetShardTimeRanges)
shardTimeRanges.AddRanges(ns.IndexRunOptions.AllShardTimeRanges)
}

namespaceResults[ns.Metadata.ID().String()] = &readNamespaceResult{
Expand Down Expand Up @@ -524,6 +540,7 @@ func (s *commitLogSource) Read(
bootstrapResult := bootstrap.NamespaceResults{
Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}),
}

for _, ns := range namespaceResults {
id := ns.namespace.Metadata.ID()
dataResult := result.NewDataBootstrapResult()
Expand Down
30 changes: 16 additions & 14 deletions src/dbnode/storage/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func (b bootstrapProcess) Run(
dataRanges.firstRangeWithPersistTrue.Range,
namespace.Shards,
)
secondRanges := b.newShardTimeRanges(
dataRanges.secondRangeWithPersistFalse.Range, namespace.Shards)
allRanges := firstRanges.Copy()
allRanges.AddRanges(secondRanges)
namespacesRunFirst.Namespaces.Set(namespace.Metadata.ID(), Namespace{
Metadata: namespace.Metadata,
Shards: namespace.Shards,
Expand All @@ -181,18 +185,16 @@ func (b bootstrapProcess) Run(
DataTargetRange: dataRanges.firstRangeWithPersistTrue,
IndexTargetRange: indexRanges.firstRangeWithPersistTrue,
DataRunOptions: NamespaceRunOptions{
ShardTimeRanges: firstRanges.Copy(),
TargetShardTimeRanges: firstRanges.Copy(),
RunOptions: dataRanges.firstRangeWithPersistTrue.RunOptions,
ShardTimeRanges: firstRanges.Copy(),
AllShardTimeRanges: allRanges.Copy(),
RunOptions: dataRanges.firstRangeWithPersistTrue.RunOptions,
},
IndexRunOptions: NamespaceRunOptions{
ShardTimeRanges: firstRanges.Copy(),
TargetShardTimeRanges: firstRanges.Copy(),
RunOptions: indexRanges.firstRangeWithPersistTrue.RunOptions,
ShardTimeRanges: firstRanges.Copy(),
AllShardTimeRanges: allRanges.Copy(),
RunOptions: indexRanges.firstRangeWithPersistTrue.RunOptions,
},
})
secondRanges := b.newShardTimeRanges(
dataRanges.secondRangeWithPersistFalse.Range, namespace.Shards)
namespacesRunSecond.Namespaces.Set(namespace.Metadata.ID(), Namespace{
Metadata: namespace.Metadata,
Shards: namespace.Shards,
Expand All @@ -201,14 +203,14 @@ func (b bootstrapProcess) Run(
DataTargetRange: dataRanges.secondRangeWithPersistFalse,
IndexTargetRange: indexRanges.secondRangeWithPersistFalse,
DataRunOptions: NamespaceRunOptions{
ShardTimeRanges: secondRanges.Copy(),
TargetShardTimeRanges: secondRanges.Copy(),
RunOptions: dataRanges.secondRangeWithPersistFalse.RunOptions,
ShardTimeRanges: secondRanges.Copy(),
AllShardTimeRanges: allRanges.Copy(),
RunOptions: dataRanges.secondRangeWithPersistFalse.RunOptions,
},
IndexRunOptions: NamespaceRunOptions{
ShardTimeRanges: secondRanges.Copy(),
TargetShardTimeRanges: secondRanges.Copy(),
RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions,
ShardTimeRanges: secondRanges.Copy(),
AllShardTimeRanges: allRanges.Copy(),
RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions,
},
})
}
Expand Down
9 changes: 4 additions & 5 deletions src/dbnode/storage/bootstrap/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,10 @@ type NamespaceRunOptions struct {
// ShardTimeRanges are the time ranges for the shards that should be fulfilled
// by the bootstrapper. This changes each bootstrapper pass as time ranges are fulfilled.
ShardTimeRanges result.ShardTimeRanges
// TargetShardTimeRanges are the original target time ranges for shards and does not change
// each bootstrapper pass.
// NB(bodu): This is used by the commit log bootstrapper as it needs to run for the entire original
// target shard time ranges.
TargetShardTimeRanges result.ShardTimeRanges
// AllShardTimeRanges covers the entire retention period and does not change each bootstrapper pass.
// NB(rhall): This is used by the commit log bootstrapper since it only runs during a single pass of the bootstrapper.
// See the commit log bootstrapper for more details.
AllShardTimeRanges result.ShardTimeRanges
// RunOptions are the run options for the bootstrap run.
RunOptions RunOptions
}
Expand Down
12 changes: 6 additions & 6 deletions src/dbnode/storage/bootstrap/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,14 @@ func BuildNamespacesTesterWithReaderIteratorPool(
Shards: shards,
DataAccumulator: acc,
DataRunOptions: NamespaceRunOptions{
ShardTimeRanges: ranges.Copy(),
TargetShardTimeRanges: ranges.Copy(),
RunOptions: runOpts,
ShardTimeRanges: ranges.Copy(),
AllShardTimeRanges: ranges.Copy(),
RunOptions: runOpts,
},
IndexRunOptions: NamespaceRunOptions{
ShardTimeRanges: ranges.Copy(),
TargetShardTimeRanges: ranges.Copy(),
RunOptions: runOpts,
ShardTimeRanges: ranges.Copy(),
AllShardTimeRanges: ranges.Copy(),
RunOptions: runOpts,
},
})
}
Expand Down