From f3e03275dc8bb76687a7b6eb3ec8107bf6d73a7a Mon Sep 17 00:00:00 2001 From: Rui Hu Date: Thu, 5 Jan 2023 17:49:48 -0500 Subject: [PATCH] ccl/backupccl: add new split and scatter processor that generates import spans Previously, restore creates all of its import spans all at once and stores them in memory. OOMs caused by the size of these import spans on restore of large backups with many incremental chains has been the cause of many escalations. This patch modifies import span creation so that import spans are generated one at a time. This span generator then used in the split and scatter processor to generate the import spans that are used in the rest of restore instead of having the spans specified in the processor's spec. A future patch will add memory monitoring to the import span generation to further safeguard against OOMs in restore. This patch also changes the import span generation algorithm. The cluster setting `bulkio.restore.use_simple_import_spans` is introduced in this patch, which, if set to true, will revert the algorithm back to makeSimpleImportSpans. Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_metadata_test.go | 16 +- .../backupccl/backupinfo/backup_metadata.go | 77 +-- .../backupccl/backuprand/backup_rand_test.go | 8 +- pkg/ccl/backupccl/bench_covering_test.go | 24 +- .../generative_split_and_scatter_processor.go | 384 ++++++++++++++ pkg/ccl/backupccl/restore_job.go | 166 +++--- .../backupccl/restore_processor_planning.go | 123 ++--- pkg/ccl/backupccl/restore_span_covering.go | 483 +++++++++++++++++- .../backupccl/restore_span_covering_test.go | 125 +++-- pkg/sql/execinfrapb/api.go | 4 + pkg/sql/execinfrapb/flow_diagram.go | 6 + pkg/sql/execinfrapb/processors.proto | 1 + pkg/sql/execinfrapb/processors_bulk_io.proto | 30 ++ pkg/sql/rowexec/processors.go | 12 + 15 files changed, 1236 insertions(+), 224 deletions(-) create mode 100644 pkg/ccl/backupccl/generative_split_and_scatter_processor.go diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 8f0148efeedb..4c8af2228502 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "backup_telemetry.go", "create_scheduled_backup.go", "file_sst_sink.go", + "generative_split_and_scatter_processor.go", "key_rewriter.go", "restoration_data.go", "restore_data_processor.go", diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backup_metadata_test.go index c9338be5365c..955bbd7d3a0b 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backup_metadata_test.go @@ -187,18 +187,22 @@ func checkFiles( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaFiles []backuppb.BackupManifest_File - var file backuppb.BackupManifest_File it, err := bm.NewFileIter(ctx) if err != nil { t.Fatal(err) } defer it.Close() - for it.Next(&file) { - metaFiles = append(metaFiles, file) - } - if it.Err() != nil { - t.Fatal(it.Err()) + for ; ; it.Next() { + ok, err := it.Valid() + if err != nil { + t.Fatal(err) + } + if !ok { + break + } + + metaFiles = append(metaFiles, *it.Value()) } require.Equal(t, m.Files, metaFiles) diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index efc648336e9c..67adef4b782c 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -262,6 +262,14 @@ func writeDescsToMetadata( return nil } +func FileCmp(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) int { + if cmp := left.Span.Key.Compare(right.Span.Key); cmp != 0 { + return cmp + } + + return strings.Compare(left.Path, right.Path) +} + func writeFilesSST( ctx context.Context, m *backuppb.BackupManifest, @@ -280,8 +288,7 @@ func writeFilesSST( // Sort and write all of the files into a single file info SST. sort.Slice(m.Files, func(i, j int) bool { - cmp := m.Files[i].Span.Key.Compare(m.Files[j].Span.Key) - return cmp < 0 || (cmp == 0 && strings.Compare(m.Files[i].Path, m.Files[j].Path) < 0) + return FileCmp(m.Files[i], m.Files[j]) < 0 }) for i := range m.Files { @@ -1015,10 +1022,11 @@ func (si *SpanIterator) Next(span *roachpb.Span) bool { return false } -// FileIterator is a simple iterator to iterate over stats.TableStatisticProtos. +// FileIterator is a simple iterator to iterate over backuppb.BackupManifest_File. type FileIterator struct { mergedIterator storage.SimpleMVCCIterator err error + file *backuppb.BackupManifest_File } // NewFileIter creates a new FileIterator for the backup metadata. @@ -1076,40 +1084,51 @@ func (fi *FileIterator) Close() { fi.mergedIterator.Close() } -// Err returns the iterator's error. -func (fi *FileIterator) Err() error { - return fi.err -} - -// Next retrieves the next file in the iterator. -// -// Next returns true if next element was successfully unmarshalled into file, -// and false if there are no more elements or if an error was encountered. When -// Next returns false, the user should call the Err method to verify the -// existence of an error. -func (fi *FileIterator) Next(file *backuppb.BackupManifest_File) bool { +// Valid indicates whether or not the iterator is pointing to a valid value. +func (fi *FileIterator) Valid() (bool, error) { if fi.err != nil { - return false + return false, fi.err } - valid, err := fi.mergedIterator.Valid() - if err != nil || !valid { - fi.err = err - return false - } - v, err := fi.mergedIterator.UnsafeValue() - if err != nil { + if ok, err := fi.mergedIterator.Valid(); !ok { fi.err = err - return false + return ok, err } - err = protoutil.Unmarshal(v, file) - if err != nil { - fi.err = err - return false + + if fi.file == nil { + v, err := fi.mergedIterator.UnsafeValue() + if err != nil { + fi.err = err + return false, fi.err + } + + file := &backuppb.BackupManifest_File{} + err = protoutil.Unmarshal(v, file) + if err != nil { + fi.err = err + return false, fi.err + } + fi.file = file } + return true, nil +} + +// Value returns the current value of the iterator, if valid. +func (fi *FileIterator) Value() *backuppb.BackupManifest_File { + return fi.file +} +// Next advances the iterator the the next value. +func (fi *FileIterator) Next() { fi.mergedIterator.Next() - return true + fi.file = nil +} + +// Reset resets the iterator to the first value. +func (fi *FileIterator) Reset() { + fi.mergedIterator.SeekGE(storage.MVCCKey{}) + fi.err = nil + fi.file = nil } // DescIterator is a simple iterator to iterate over descpb.Descriptors. diff --git a/pkg/ccl/backupccl/backuprand/backup_rand_test.go b/pkg/ccl/backupccl/backuprand/backup_rand_test.go index e462443a47e3..8dd6ccc0fc01 100644 --- a/pkg/ccl/backupccl/backuprand/backup_rand_test.go +++ b/pkg/ccl/backupccl/backuprand/backup_rand_test.go @@ -33,7 +33,9 @@ import ( // randomly generated tables and verifies their data and schema are preserved. // It tests that full database backup as well as all subsets of per-table backup // roundtrip properly. 50% of the time, the test runs the restore with the -// schema_only parameter, which does not restore any rows from user tables. +// schema_only parameter, which does not restore any rows from user tables. The +// test will also run with bulkio.restore.use_simple_import_spans set to true +// 50% of the time. func TestBackupRestoreRandomDataRoundtrips(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -72,6 +74,10 @@ func TestBackupRestoreRandomDataRoundtrips(t *testing.T) { runSchemaOnlyExtension = ", schema_only" } + if rng.Intn(2) == 0 { + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.use_simple_import_spans = true") + } + tables := sqlDB.Query(t, `SELECT name FROM crdb_internal.tables WHERE database_name = 'rand' AND schema_name = 'public'`) var tableNames []string diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index 5ecf2fb4aee9..a832efe65bb0 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -14,6 +14,8 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/stretchr/testify/require" @@ -27,6 +29,7 @@ func BenchmarkCoverageChecks(b *testing.B) { r, _ := randutil.NewTestRand() for _, numBackups := range []int{1, 7, 24, 24 * 4} { + numBackups := numBackups b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) { for _, numSpans := range []int{10, 20, 100} { b.Run(fmt.Sprintf("numSpans=%d", numSpans), func(b *testing.B) { @@ -61,6 +64,7 @@ func BenchmarkRestoreEntryCover(b *testing.B) { ctx := context.Background() r, _ := randutil.NewTestRand() for _, numBackups := range []int{1, 2, 24, 24 * 4} { + numBackups := numBackups b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) { for _, baseFiles := range []int{0, 100, 10000} { b.Run(fmt.Sprintf("numFiles=%d", baseFiles), func(b *testing.B) { @@ -82,10 +86,22 @@ func BenchmarkRestoreEntryCover(b *testing.B) { layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, &execCfg, backups, nil, nil) require.NoError(b, err) - cov, err := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, - layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, - nil, 0) - require.NoError(b, err) + + spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + defer close(spanCh) + return generateAndSendImportSpans(ctx, backups[numBackups-1].Spans, backups, + layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, 0, spanCh, false) + }) + + var cov []execinfrapb.RestoreSpanEntry + for entry := range spanCh { + cov = append(cov, entry) + } + + require.NoError(b, g.Wait()) b.ReportMetric(float64(len(cov)), "coverSize") } }) diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go new file mode 100644 index 000000000000..bffa2482329e --- /dev/null +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -0,0 +1,384 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowexec" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/logtags" +) + +const generativeSplitAndScatterProcessorName = "generativeSplitAndScatter" + +var generativeSplitAndScatterOutputTypes = []*types.T{ + types.Bytes, // Span key for the range router + types.Bytes, // RestoreDataEntry bytes +} + +// generativeSplitAndScatterProcessor is given a backup chain, whose manifests +// are specified in URIs and iteratively generates RestoreSpanEntries to be +// distributed across the cluster. Depending on which node the span ends up on, +// it forwards RestoreSpanEntry as bytes along with the key of the span on a +// row. It expects an output RangeRouter and before it emits each row, it +// updates the entry in the RangeRouter's map with the destination of the +// scatter. +type generativeSplitAndScatterProcessor struct { + execinfra.ProcessorBase + + flowCtx *execinfra.FlowCtx + spec execinfrapb.GenerativeSplitAndScatterSpec + output execinfra.RowReceiver + + scatterer splitAndScatterer + // cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for + // it to finish. + cancelScatterAndWaitForWorker func() + + doneScatterCh chan entryNode + // A cache for routing datums, so only 1 is allocated per node. + routingDatumCache map[roachpb.NodeID]rowenc.EncDatum + scatterErr error +} + +var _ execinfra.Processor = &generativeSplitAndScatterProcessor{} + +func newGenerativeSplitAndScatterProcessor( + ctx context.Context, + flowCtx *execinfra.FlowCtx, + processorID int32, + spec execinfrapb.GenerativeSplitAndScatterSpec, + post *execinfrapb.PostProcessSpec, + output execinfra.RowReceiver, +) (execinfra.Processor, error) { + + db := flowCtx.Cfg.DB + kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys, + false /* restoreTenantFromStream */) + if err != nil { + return nil, err + } + + scatterer := makeSplitAndScatterer(db.KV(), kr) + if spec.ValidateOnly { + nodeID, _ := flowCtx.NodeID.OptionalNodeID() + scatterer = noopSplitAndScatterer{nodeID} + } + ssp := &generativeSplitAndScatterProcessor{ + flowCtx: flowCtx, + spec: spec, + output: output, + scatterer: scatterer, + // Large enough so that it never blocks. + doneScatterCh: make(chan entryNode, spec.NumEntries), + routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum), + } + if err := ssp.Init(ctx, ssp, post, generativeSplitAndScatterOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ + execinfra.ProcStateOpts{ + InputsToDrain: nil, // there are no inputs to drain + TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { + ssp.close() + return nil + }, + }); err != nil { + return nil, err + } + return ssp, nil +} + +// Start is part of the RowSource interface. +func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) { + ctx = logtags.AddTag(ctx, "job", gssp.spec.JobID) + ctx = gssp.StartInternal(ctx, generativeSplitAndScatterProcessorName) + // Note that the loop over doneScatterCh in Next should prevent the goroutine + // below from leaking when there are no errors. However, if that loop needs to + // exit early, runSplitAndScatter's context will be canceled. + scatterCtx, cancel := context.WithCancel(ctx) + workerDone := make(chan struct{}) + gssp.cancelScatterAndWaitForWorker = func() { + cancel() + <-workerDone + } + if err := gssp.flowCtx.Stopper().RunAsyncTaskEx(scatterCtx, stop.TaskOpts{ + TaskName: "generativeSplitAndScatter-worker", + SpanOpt: stop.ChildSpan, + }, func(ctx context.Context) { + gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.scatterer, gssp.doneScatterCh) + cancel() + close(gssp.doneScatterCh) + close(workerDone) + }); err != nil { + gssp.scatterErr = err + cancel() + close(workerDone) + } +} + +// Next implements the execinfra.RowSource interface. +func (gssp *generativeSplitAndScatterProcessor) Next() ( + rowenc.EncDatumRow, + *execinfrapb.ProducerMetadata, +) { + if gssp.State != execinfra.StateRunning { + return nil, gssp.DrainHelper() + } + + scatteredEntry, ok := <-gssp.doneScatterCh + if ok { + entry := scatteredEntry.entry + entryBytes, err := protoutil.Marshal(&entry) + if err != nil { + gssp.MoveToDraining(err) + return nil, gssp.DrainHelper() + } + + // The routing datums informs the router which output stream should be used. + routingDatum, ok := gssp.routingDatumCache[scatteredEntry.node] + if !ok { + routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node)) + gssp.routingDatumCache[scatteredEntry.node] = routingDatum + } + + row := rowenc.EncDatumRow{ + routingDatum, + rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(entryBytes))), + } + return row, nil + } + + if gssp.scatterErr != nil { + gssp.MoveToDraining(gssp.scatterErr) + return nil, gssp.DrainHelper() + } + + gssp.MoveToDraining(nil /* error */) + return nil, gssp.DrainHelper() +} + +// ConsumerClosed is part of the RowSource interface. +func (gssp *generativeSplitAndScatterProcessor) ConsumerClosed() { + // The consumer is done, Next() will not be called again. + gssp.close() +} + +// close stops the production workers. This needs to be called if the consumer +// runs into an error and stops consuming scattered entries to make sure we +// don't leak goroutines. +func (gssp *generativeSplitAndScatterProcessor) close() { + gssp.cancelScatterAndWaitForWorker() + gssp.InternalClose() +} + +func makeBackupMetadata( + ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.GenerativeSplitAndScatterSpec, +) ([]backuppb.BackupManifest, layerToBackupManifestFileIterFactory, error) { + + execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig) + + kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig, + execCfg.InternalDB, spec.User()) + + backupManifests, _, err := backupinfo.LoadBackupManifestsAtTime(ctx, nil, spec.URIs, + spec.User(), execCfg.DistSQLSrv.ExternalStorageFromURI, spec.Encryption, &kmsEnv, spec.EndTime) + if err != nil { + return nil, nil, err + } + + layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, execCfg, + backupManifests, spec.Encryption, &kmsEnv) + if err != nil { + return nil, nil, err + } + + return backupManifests, layerToBackupManifestFileIterFactory, nil +} + +type restoreEntryChunk struct { + entries []execinfrapb.RestoreSpanEntry + splitKey roachpb.Key +} + +func runGenerativeSplitAndScatter( + ctx context.Context, + flowCtx *execinfra.FlowCtx, + spec *execinfrapb.GenerativeSplitAndScatterSpec, + scatterer splitAndScatterer, + doneScatterCh chan<- entryNode, +) error { + log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes", + spec.NumEntries, spec.ChunkSize, spec.NumNodes) + g := ctxgroup.WithContext(ctx) + + splitWorkers := int(spec.NumNodes) + restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, splitWorkers*int(spec.ChunkSize)) + g.GoCtx(func(ctx context.Context) error { + defer close(restoreSpanEntriesCh) + + backups, layerToFileIterFactory, err := makeBackupMetadata(ctx, + flowCtx, spec) + if err != nil { + return err + } + + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, spec.EndTime) + if err != nil { + return err + } + + backupLocalityMap, err := makeBackupLocalityMap(spec.BackupLocalityInfo, spec.User()) + if err != nil { + return err + } + + return generateAndSendImportSpans( + ctx, + spec.Spans, + backups, + layerToFileIterFactory, + backupLocalityMap, + introducedSpanFrontier, + spec.HighWater, + spec.TargetSize, + restoreSpanEntriesCh, + spec.UseSimpleImportSpans, + ) + }) + + restoreEntryChunksCh := make(chan restoreEntryChunk, splitWorkers) + g.GoCtx(func(ctx context.Context) error { + defer close(restoreEntryChunksCh) + + var idx int64 + var chunk restoreEntryChunk + for entry := range restoreSpanEntriesCh { + entry.ProgressIdx = idx + idx++ + if len(chunk.entries) == int(spec.ChunkSize) { + chunk.splitKey = entry.Span.Key + restoreEntryChunksCh <- chunk + chunk = restoreEntryChunk{} + } + chunk.entries = append(chunk.entries, entry) + } + + if len(chunk.entries) > 0 { + restoreEntryChunksCh <- chunk + } + return nil + }) + + importSpanChunksCh := make(chan scatteredChunk, splitWorkers*2) + g2 := ctxgroup.WithContext(ctx) + for worker := 0; worker < splitWorkers; worker++ { + g2.GoCtx(func(ctx context.Context) error { + // Chunks' leaseholders should be randomly placed throughout the + // cluster. + for importSpanChunk := range restoreEntryChunksCh { + scatterKey := importSpanChunk.entries[0].Span.Key + if !importSpanChunk.splitKey.Equal(roachpb.Key{}) { + // Split at the start of the next chunk, to partition off a + // prefix of the space to scatter. + if err := scatterer.split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil { + return err + } + } + chunkDestination, err := scatterer.scatter(ctx, flowCtx.Codec(), scatterKey) + if err != nil { + return err + } + if chunkDestination == 0 { + // If scatter failed to find a node for range ingestion, route the range + // to the node currently running the split and scatter processor. + if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); ok { + chunkDestination = nodeID + log.Warningf(ctx, "scatter returned node 0. "+ + "Route span starting at %s to current node %v", scatterKey, nodeID) + } else { + log.Warningf(ctx, "scatter returned node 0. "+ + "Route span starting at %s to default stream", scatterKey) + } + } + + sc := scatteredChunk{ + destination: chunkDestination, + entries: importSpanChunk.entries, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case importSpanChunksCh <- sc: + } + } + return nil + }) + } + + g.GoCtx(func(ctx context.Context) error { + defer close(importSpanChunksCh) + return g2.Wait() + }) + + // TODO(pbardea): This tries to cover for a bad scatter by having 2 * the + // number of nodes in the cluster. Is it necessary? + splitScatterWorkers := 2 * splitWorkers + for worker := 0; worker < splitScatterWorkers; worker++ { + g.GoCtx(func(ctx context.Context) error { + for importSpanChunk := range importSpanChunksCh { + chunkDestination := importSpanChunk.destination + for i, importEntry := range importSpanChunk.entries { + nextChunkIdx := i + 1 + + log.VInfof(ctx, 2, "processing a span [%s,%s)", importEntry.Span.Key, importEntry.Span.EndKey) + var splitKey roachpb.Key + if nextChunkIdx < len(importSpanChunk.entries) { + // Split at the next entry. + splitKey = importSpanChunk.entries[nextChunkIdx].Span.Key + if err := scatterer.split(ctx, flowCtx.Codec(), splitKey); err != nil { + return err + } + } + + scatteredEntry := entryNode{ + entry: importEntry, + node: chunkDestination, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case doneScatterCh <- scatteredEntry: + } + } + } + return nil + }) + } + + return g.Wait() +} + +func init() { + rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 863b6d193365..ce93e174359d 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -77,6 +78,13 @@ import ( // tables we process in a single txn when restoring their table statistics. var restoreStatsInsertBatchSize = 10 +var useSimpleImportSpans = settings.RegisterBoolSetting( + settings.TenantWritable, + "bulkio.restore.use_simple_import_spans", + "if set to true, restore will generate its import spans using the makeSimpleImportSpans algorithm", + false, +) + // rewriteBackupSpanKey rewrites a backup span start key for the purposes of // splitting up the target key-space to send out the actual work of restoring. // @@ -257,15 +265,6 @@ func restore( return emptyRowCount, nil } - mu := struct { - syncutil.Mutex - highWaterMark int - res roachpb.RowCount - requestsCompleted []bool - }{ - highWaterMark: -1, - } - backupLocalityMap, err := makeBackupLocalityMap(backupLocalityInfo, user) if err != nil { return emptyRowCount, errors.Wrap(err, "resolving locality locations") @@ -289,53 +288,65 @@ func restore( if err != nil { return emptyRowCount, err } - importSpans, err := makeSimpleImportSpans( - dataToRestore.getSpans(), - backupManifests, - layerToBackupManifestFileIterFactory, - backupLocalityMap, - introducedSpanFrontier, - highWaterMark, - targetRestoreSpanSize.Get(execCtx.ExecCfg().SV())) - if err != nil { - return emptyRowCount, err - } - if len(importSpans) == 0 { - // There are no files to restore. - return emptyRowCount, nil - } + simpleImportSpans := useSimpleImportSpans.Get(&execCtx.ExecCfg().Settings.SV) - for i := range importSpans { - importSpans[i].ProgressIdx = int64(i) + mu := struct { + syncutil.Mutex + highWaterMark int64 + ceiling int64 + res roachpb.RowCount + // As part of job progress tracking, inFlightImportSpans tracks all the + // spans that have been generated are being processed by the processors in + // distRestore. requestsCompleleted tracks the spans from + // inFlightImportSpans that have completed its processing. Once all spans up + // to index N have been processed (and appear in requestsCompleted), then + // any spans with index < N will be removed from both inFlightImportSpans + // and requestsCompleted maps. + inFlightImportSpans map[int64]roachpb.Span + requestsCompleted map[int64]bool + }{ + highWaterMark: -1, + ceiling: 0, + inFlightImportSpans: make(map[int64]roachpb.Span), + requestsCompleted: make(map[int64]bool), + } + + targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV) + importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + genSpan := func(ctx context.Context) error { + defer close(importSpanCh) + return generateAndSendImportSpans( + restoreCtx, + dataToRestore.getSpans(), + backupManifests, + layerToBackupManifestFileIterFactory, + backupLocalityMap, + introducedSpanFrontier, + highWaterMark, + targetSize, + importSpanCh, + simpleImportSpans, + ) } - mu.requestsCompleted = make([]bool, len(importSpans)) - // TODO(pbardea): This not super principled. I just wanted something that - // wasn't a constant and grew slower than linear with the length of - // importSpans. It seems to be working well for BenchmarkRestore2TB but - // worth revisiting. - // It tries to take the cluster size into account so that larger clusters - // distribute more chunks amongst them so that after scattering there isn't - // a large varience in the distribution of entries. - chunkSize := int(math.Sqrt(float64(len(importSpans)))) / numNodes - if chunkSize == 0 { - chunkSize = 1 - } - importSpanChunks := make([][]execinfrapb.RestoreSpanEntry, 0, len(importSpans)/chunkSize) - for start := 0; start < len(importSpans); { - importSpanChunk := importSpans[start:] - end := start + chunkSize - if end < len(importSpans) { - importSpanChunk = importSpans[start:end] + // Count number of import spans. + var numImportSpans int + var countTasks []func(ctx context.Context) error + log.Infof(restoreCtx, "rh_debug: starting count task") + spanCountTask := func(ctx context.Context) error { + for range importSpanCh { + numImportSpans++ } - importSpanChunks = append(importSpanChunks, importSpanChunk) - start = end + return nil + } + countTasks = append(countTasks, genSpan, spanCountTask) + if err := ctxgroup.GoAndWait(restoreCtx, countTasks...); err != nil { + return emptyRowCount, errors.Wrapf(err, "counting number of import spans") } - requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block - progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) - + importSpanCh = make(chan execinfrapb.RestoreSpanEntry, 1000) + requestFinishedCh := make(chan struct{}, numImportSpans) // enough buffer to never block // tasks are the concurrent tasks that are run during the restore. var tasks []func(ctx context.Context) error if dataToRestore.isMainBundle() { @@ -344,13 +355,13 @@ func restore( // cluster restores) may be restored first. When restoring that data, we // don't want to update the high-water mark key, so instead progress is just // defined on the main data bundle (of which there should only be one). - progressLogger := jobs.NewChunkProgressLogger(job, len(importSpans), job.FractionCompleted(), + progressLogger := jobs.NewChunkProgressLogger(job, numImportSpans, job.FractionCompleted(), func(progressedCtx context.Context, details jobspb.ProgressDetails) { switch d := details.(type) { case *jobspb.Progress_Restore: mu.Lock() if mu.highWaterMark >= 0 { - d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key + d.Restore.HighWater = mu.inFlightImportSpans[mu.highWaterMark].Key } mu.Unlock() default: @@ -366,10 +377,10 @@ func restore( tasks = append(tasks, jobProgressLoop) } - jobCheckpointLoop := func(ctx context.Context) error { + progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) + + generativeCheckpointLoop := func(ctx context.Context) error { defer close(requestFinishedCh) - // When a processor is done importing a span, it will send a progress update - // to progCh. for progress := range progCh { mu.Lock() var progDetails backuppb.RestoreProgress @@ -380,16 +391,32 @@ func restore( mu.res.Add(progDetails.Summary) idx := progDetails.ProgressIdx - // Assert that we're actually marking the correct span done. See #23977. - if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { - mu.Unlock() - return errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, importSpans[idx], - ) + if idx >= mu.ceiling { + for i := mu.ceiling; i <= idx; i++ { + importSpan := <-importSpanCh + mu.inFlightImportSpans[i] = importSpan.Span + } + mu.ceiling = idx + 1 } - mu.requestsCompleted[idx] = true - for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { - mu.highWaterMark = j + + if sp, ok := mu.inFlightImportSpans[idx]; ok { + // Assert that we're actually marking the correct span done. See #23977. + if !sp.Key.Equal(progDetails.DataSpan.Key) { + mu.Unlock() + return errors.Newf("request %d for span %v does not match import span for same idx: %v", + idx, progDetails.DataSpan, sp, + ) + } + mu.requestsCompleted[idx] = true + prevHighWater := mu.highWaterMark + for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ { + mu.highWaterMark = j + } + + for j := prevHighWater; j < mu.highWaterMark; j++ { + delete(mu.requestsCompleted, j) + delete(mu.inFlightImportSpans, j) + } } mu.Unlock() @@ -399,14 +426,13 @@ func restore( } return nil } - tasks = append(tasks, jobCheckpointLoop) + tasks = append(tasks, generativeCheckpointLoop, genSpan) runRestore := func(ctx context.Context) error { return distRestore( ctx, execCtx, int64(job.ID()), - importSpanChunks, dataToRestore.getPKIDs(), encryption, kmsEnv, @@ -414,6 +440,14 @@ func restore( dataToRestore.getTenantRekeys(), endTime, dataToRestore.isValidateOnly(), + details.URIs, + dataToRestore.getSpans(), + backupLocalityInfo, + highWaterMark, + targetSize, + numNodes, + numImportSpans, + simpleImportSpans, progCh, ) } @@ -423,7 +457,7 @@ func restore( // This leaves the data that did get imported in case the user wants to // retry. // TODO(dan): Build tooling to allow a user to restart a failed restore. - return emptyRowCount, errors.Wrapf(err, "importing %d ranges", len(importSpans)) + return emptyRowCount, errors.Wrapf(err, "importing %d ranges", numImportSpans) } return mu.res, nil diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 225949930cd2..08ac4f069f45 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -11,6 +11,7 @@ package backupccl import ( "bytes" "context" + "math" "sort" "time" @@ -60,7 +61,6 @@ func distRestore( ctx context.Context, execCtx sql.JobExecContext, jobID int64, - chunks [][]execinfrapb.RestoreSpanEntry, pkIDs map[uint64]bool, encryption *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, @@ -68,6 +68,14 @@ func distRestore( tenantRekeys []execinfrapb.TenantRekey, restoreTime hlc.Timestamp, validateOnly bool, + uris []string, + requiredSpans []roachpb.Span, + backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, + lowWaterMark roachpb.Key, + targetSize int64, + numNodes int, + numImportSpans int, + useSimpleImportSpans bool, progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, ) error { defer close(progCh) @@ -107,12 +115,6 @@ func distRestore( p := planCtx.NewPhysicalPlan() - splitAndScatterSpecs, err := makeSplitAndScatterSpecs(sqlInstanceIDs, chunks, tableRekeys, - tenantRekeys, validateOnly) - if err != nil { - return nil, nil, err - } - restoreDataSpec := execinfrapb.RestoreDataSpec{ JobID: jobID, RestoreTime: restoreTime, @@ -123,12 +125,6 @@ func distRestore( ValidateOnly: validateOnly, } - if len(splitAndScatterSpecs) == 0 { - // We should return an error here as there are no nodes that are compatible, - // but we should have at least found ourselves. - return nil, nil, errors.AssertionFailedf("no compatible nodes") - } - // Plan SplitAndScatter in a round-robin fashion. splitAndScatterStageID := p.NewStageOnNodes(sqlInstanceIDs) splitAndScatterProcs := make(map[base.SQLInstanceID]physicalplan.ProcessorIdx) @@ -162,33 +158,55 @@ func distRestore( return bytes.Compare(rangeRouterSpec.Spans[i].Start, rangeRouterSpec.Spans[j].Start) == -1 }) - for _, n := range sqlInstanceIDs { - spec := splitAndScatterSpecs[n] - if spec == nil { - // We may have fewer chunks than we have nodes for very small imports. In - // this case we only want to plan splitAndScatter nodes on a subset of - // nodes. Note that we still want to plan a RestoreData processor on every - // node since each entry could be scattered anywhere. - continue - } - proc := physicalplan.Processor{ - SQLInstanceID: n, - Spec: execinfrapb.ProcessorSpec{ - Core: execinfrapb.ProcessorCoreUnion{SplitAndScatter: splitAndScatterSpecs[n]}, - Post: execinfrapb.PostProcessSpec{}, - Output: []execinfrapb.OutputRouterSpec{ - { - Type: execinfrapb.OutputRouterSpec_BY_RANGE, - RangeRouterSpec: rangeRouterSpec, - }, + // TODO(pbardea): This not super principled. I just wanted something that + // wasn't a constant and grew slower than linear with the length of + // importSpans. It seems to be working well for BenchmarkRestore2TB but + // worth revisiting. + // It tries to take the cluster size into account so that larger clusters + // distribute more chunks amongst them so that after scattering there isn't + // a large varience in the distribution of entries. + chunkSize := int(math.Sqrt(float64(numImportSpans))) / numNodes + if chunkSize == 0 { + chunkSize = 1 + } + + id := execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID() + + spec := &execinfrapb.GenerativeSplitAndScatterSpec{ + TableRekeys: tableRekeys, + TenantRekeys: tenantRekeys, + ValidateOnly: validateOnly, + URIs: uris, + Encryption: encryption, + EndTime: restoreTime, + Spans: requiredSpans, + BackupLocalityInfo: backupLocalityInfo, + HighWater: lowWaterMark, + UserProto: execCtx.User().EncodeProto(), + TargetSize: targetSize, + ChunkSize: int64(chunkSize), + NumEntries: int64(numImportSpans), + NumNodes: int64(numNodes), + UseSimpleImportSpans: useSimpleImportSpans, + } + + proc := physicalplan.Processor{ + SQLInstanceID: id, + Spec: execinfrapb.ProcessorSpec{ + Core: execinfrapb.ProcessorCoreUnion{GenerativeSplitAndScatter: spec}, + Post: execinfrapb.PostProcessSpec{}, + Output: []execinfrapb.OutputRouterSpec{ + { + Type: execinfrapb.OutputRouterSpec_BY_RANGE, + RangeRouterSpec: rangeRouterSpec, }, - StageID: splitAndScatterStageID, - ResultTypes: splitAndScatterOutputTypes, }, - } - pIdx := p.AddProcessor(proc) - splitAndScatterProcs[n] = pIdx + StageID: splitAndScatterStageID, + ResultTypes: splitAndScatterOutputTypes, + }, } + pIdx := p.AddProcessor(proc) + splitAndScatterProcs[id] = pIdx // Plan RestoreData. restoreDataStageID := p.NewStageOnNodes(sqlInstanceIDs) @@ -284,34 +302,3 @@ func distRestore( return g.Wait() } - -// makeSplitAndScatterSpecs returns a map from nodeID to the SplitAndScatter -// spec that should be planned on that node. Given the chunks of ranges to -// import it round-robin distributes the chunks amongst the given nodes. -func makeSplitAndScatterSpecs( - sqlInstanceIDs []base.SQLInstanceID, - chunks [][]execinfrapb.RestoreSpanEntry, - tableRekeys []execinfrapb.TableRekey, - tenantRekeys []execinfrapb.TenantRekey, - validateOnly bool, -) (map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec, error) { - specsBySQLInstanceID := make(map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec) - for i, chunk := range chunks { - sqlInstanceID := sqlInstanceIDs[i%len(sqlInstanceIDs)] - if spec, ok := specsBySQLInstanceID[sqlInstanceID]; ok { - spec.Chunks = append(spec.Chunks, execinfrapb.SplitAndScatterSpec_RestoreEntryChunk{ - Entries: chunk, - }) - } else { - specsBySQLInstanceID[sqlInstanceID] = &execinfrapb.SplitAndScatterSpec{ - Chunks: []execinfrapb.SplitAndScatterSpec_RestoreEntryChunk{{ - Entries: chunk, - }}, - TableRekeys: tableRekeys, - TenantRekeys: tenantRekeys, - ValidateOnly: validateOnly, - } - } - } - return specsBySQLInstanceID, nil -} diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index fb53655a2a98..b4abf18fa247 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -9,6 +9,7 @@ package backupccl import ( + "container/heap" "context" "sort" @@ -64,8 +65,10 @@ var targetRestoreSpanSize = settings.RegisterByteSizeSetting( // the `BackupManifest_Files` field of a manifest. type backupManifestFileIterator interface { next() (backuppb.BackupManifest_File, bool) + peek() (backuppb.BackupManifest_File, bool) err() error close() + reset() } // inMemoryFileIterator iterates over the `BackupManifest_Files` field stored @@ -76,11 +79,16 @@ type inMemoryFileIterator struct { } func (i *inMemoryFileIterator) next() (backuppb.BackupManifest_File, bool) { + f, hasNext := i.peek() + i.curIdx++ + return f, hasNext +} + +func (i *inMemoryFileIterator) peek() (backuppb.BackupManifest_File, bool) { if i.curIdx >= len(i.manifest.Files) { return backuppb.BackupManifest_File{}, false } f := i.manifest.Files[i.curIdx] - i.curIdx++ return f, true } @@ -90,6 +98,10 @@ func (i *inMemoryFileIterator) err() error { func (i *inMemoryFileIterator) close() {} +func (i *inMemoryFileIterator) reset() { + i.curIdx = 0 +} + var _ backupManifestFileIterator = &inMemoryFileIterator{} // makeBackupManifestFileIterator returns a backupManifestFileIterator that can @@ -138,19 +150,34 @@ type sstFileIterator struct { } func (s *sstFileIterator) next() (backuppb.BackupManifest_File, bool) { - var file backuppb.BackupManifest_File - hasNext := s.fi.Next(&file) - return file, hasNext + f, ok := s.peek() + if ok { + s.fi.Next() + } + + return f, ok +} + +func (s *sstFileIterator) peek() (backuppb.BackupManifest_File, bool) { + if ok, _ := s.fi.Valid(); !ok { + return backuppb.BackupManifest_File{}, false + } + return *s.fi.Value(), true } func (s *sstFileIterator) err() error { - return s.fi.Err() + _, err := s.fi.Valid() + return err } func (s *sstFileIterator) close() { s.fi.Close() } +func (s *sstFileIterator) reset() { + s.fi.Reset() +} + var _ backupManifestFileIterator = &sstFileIterator{} // makeSimpleImportSpans partitions the spans of requiredSpans into a covering @@ -382,3 +409,449 @@ func getBackupManifestFileIters( return layerToFileIterFactory, nil } + +// generateAndSendImportSpans partitions the spans of requiredSpans into a +// covering of RestoreSpanEntry's which each have all overlapping files from the +// passed backups assigned to them. The spans of requiredSpans are +// trimmed/removed based on the lowWaterMark before the covering for them is +// generated. These spans are generated one at a time and then sent to spanCh. +// +// Consider a chain of backups with files f1, f2… which cover spans as follows: +// +// backup +// 0| a___1___c c__2__e h__3__i +// 1| b___4___d g____5___i +// 2| a___________6______________h j_7_k +// 3| h_8_i l_9_m +// keys--a---b---c---d---e---f---g---h----i---j---k---l----m------p----> +// +// spans: |-------span1-------||---span2---| |---span3---| +// +// The cover for those spans would look like: +// +// [a, b): 1, 6 +// [b, c): 1, 4, 6 +// [c, f): 2, 4, 6 +// [f, g): 6 +// [g, h): 5, 6 +// [h, i): 3, 5, 8 +// [l, m): 9 +// +// This cover is created by iterating through the start and end keys of all the +// files in the backup in key order via fileSpanStartAndEndKeyIterator. The +// cover spans are initially just the spans between each pair of adjacent keys +// yielded by the iterator. We then iterate through each cover span and find all +// the overlapping files. If the files that overlap a cover span is a subset of +// the files that overlap the span before it, then the two spans are merged into +// a single span in the final cover. Additionally, if targetSize > 0, we can +// merge the current cover span with the previous cover span if the merged set +// of files have a total data size below the target size. +// +// The above example is tested in TestRestoreEntryCoverExample. +// +// If useSimpleImportSpans is true, the above covering method is not used and +// the covering is created by makeSimpleImportSpans instead. +func generateAndSendImportSpans( + ctx context.Context, + requiredSpans roachpb.Spans, + backups []backuppb.BackupManifest, + layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + backupLocalityMap map[int]storeByLocalityKV, + introducedSpanFrontier *spanUtils.Frontier, + lowWaterMark roachpb.Key, + targetSize int64, + spanCh chan execinfrapb.RestoreSpanEntry, + useSimpleImportSpans bool, +) error { + if useSimpleImportSpans { + importSpans, err := makeSimpleImportSpans(requiredSpans, backups, layerToBackupManifestFileIterFactory, backupLocalityMap, introducedSpanFrontier, lowWaterMark, targetSize) + if err != nil { + return err + } + + for _, sp := range importSpans { + spanCh <- sp + } + return nil + } + + startEndKeyIt, err := newFileSpanStartAndEndKeyIterator(backups, layerToBackupManifestFileIterFactory) + if err != nil { + return err + } + + fileIterByLayer := make([]backupManifestFileIterator, 0, len(backups)) + for layer := range backups { + iter, err := layerToBackupManifestFileIterFactory[layer]() + if err != nil { + return err + } + + fileIterByLayer = append(fileIterByLayer, iter) + } + + // lastCovSpanSize is the size of files added to the right-most span of + // the cover so far. + var lastCovSpanSize int64 + var lastCovSpan roachpb.Span + var covFilesByLayer [][]backuppb.BackupManifest_File + var firstInSpan bool + + flush := func() { + entry := execinfrapb.RestoreSpanEntry{ + Span: lastCovSpan, + } + + for layer := range covFilesByLayer { + for _, f := range covFilesByLayer[layer] { + fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir} + if dir, ok := backupLocalityMap[layer][f.LocalityKV]; ok { + fileSpec = execinfrapb.RestoreFileSpec{Path: f.Path, Dir: dir} + } + entry.Files = append(entry.Files, fileSpec) + } + } + + if len(entry.Files) > 0 { + spanCh <- entry + } + } + + for _, span := range requiredSpans { + firstInSpan = true + if span.EndKey.Compare(lowWaterMark) < 0 { + continue + } + if span.Key.Compare(lowWaterMark) < 0 { + span.Key = lowWaterMark + } + + layersCoveredLater := make(map[int]bool) + for layer := range backups { + var coveredLater bool + introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if backups[layer].EndTime.Less(ts) { + coveredLater = true + } + return spanUtils.StopMatch + }) + if coveredLater { + // Don't use this backup to cover this span if the span was reintroduced + // after the backup's endTime. In this case, this backup may have + // invalid data, and further, a subsequent backup will contain all of + // this span's data. Consider the following example: + // + // T0: Begin IMPORT INTO on existing table foo, ingest some data + // T1: Backup foo + // T2: Rollback IMPORT via clearRange + // T3: Incremental backup of foo, with a full reintroduction of foo’s span + // T4: RESTORE foo: should only restore foo from the incremental backup. + // If data from the full backup were also restored, + // the imported-but-then-clearRanged data will leak in the restored cluster. + // This logic seeks to avoid this form of data corruption. + layersCoveredLater[layer] = true + } + } + + for { + if ok, err := startEndKeyIt.valid(); !ok { + if err != nil { + return err + } + break + } + + key := startEndKeyIt.value() + if span.Key.Compare(key) >= 0 { + startEndKeyIt.next() + continue + } + + var coverSpan roachpb.Span + if firstInSpan { + coverSpan.Key = span.Key + } else { + coverSpan.Key = lastCovSpan.EndKey + } + + if span.ContainsKey(key) { + coverSpan.EndKey = startEndKeyIt.value() + } else { + coverSpan.EndKey = span.EndKey + } + + newFilesByLayer, err := getNewIntersectingFilesByLayer(coverSpan, layersCoveredLater, fileIterByLayer) + if err != nil { + return err + } + + var filesByLayer [][]backuppb.BackupManifest_File + var covSize int64 + var newCovFilesSize int64 + + for layer := range newFilesByLayer { + for _, file := range newFilesByLayer[layer] { + sz := file.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } + newCovFilesSize += sz + } + filesByLayer = append(filesByLayer, newFilesByLayer[layer]) + } + + for layer := range covFilesByLayer { + for _, file := range covFilesByLayer[layer] { + sz := file.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } + + if coverSpan.Overlaps(file.Span) { + covSize += sz + filesByLayer[layer] = append(filesByLayer[layer], file) + } + } + } + + if covFilesByLayer == nil { + covFilesByLayer = newFilesByLayer + lastCovSpan = coverSpan + lastCovSpanSize = newCovFilesSize + } else { + if (newCovFilesSize == 0 || lastCovSpanSize+newCovFilesSize <= targetSize) && !firstInSpan { + // If there are no new files that cover this span or if we can add the + // files in the new span's cover to the last span's cover and still stay + // below targetSize, then we should merge the two spans. + for layer := range newFilesByLayer { + covFilesByLayer[layer] = append(covFilesByLayer[layer], newFilesByLayer[layer]...) + } + lastCovSpan.EndKey = coverSpan.EndKey + lastCovSpanSize = lastCovSpanSize + newCovFilesSize + } else { + flush() + lastCovSpan = coverSpan + covFilesByLayer = filesByLayer + lastCovSpanSize = covSize + } + } + firstInSpan = false + + if lastCovSpan.EndKey.Equal(span.EndKey) { + break + } + + startEndKeyIt.next() + } + } + + flush() + return nil +} + +// fileSpanStartAndEndKeyIterator yields (almost) all of the start and end keys +// of the spans from the files in a backup chain in key order. A start or end +// key from a file span will be yielded by the iterator if the key is not +// covered by another file span within the same layer before it in FileCmp +// order. In particular, this means that if all layers in a backup chain have +// files with non-overlapping spans, then this iterator would return all start +// and end keys for all file spans in order. For example: +// +// backup +// 0| a___1___c c__2__e h__3__i +// 1| b___4___d g____5___i +// 2| a___________6______________h j_7_k +// 3| h_8_i l_9_m +// keys--a---b---c---d---e---f---g---h----i---j---k---l----m------p----> +// +// In this case, since no file span overlaps with another file span within the same layer, +// the iterator will yield all start and end keys: +// [a, b, c, d, e, g, h, i, j, k, l, m] +// +// Another example, but with file spans that do overlap within a layer: +// +// backup +// 0| a___1___c +// 1| b_____2_____e +// | d___3___f +// 2| a___________4___________g +// 3| +// keys--a---b---c---d---e---f---g---> +// +// In this case, there is overlap between files 2 and 3 within layer 1. Since +// the start and end keys 'b' and 'e' of file 2 will be yielded by the iterator +// since there are no files before it within the same layer. Start key 'd' of +// file 3 will not be yielded since it's covered by 2's span. The end key 'f' +// will still be yielded since it's not covered by 2's span. So the iterator +// will yield: +// [a, b, c, e, f, g] +type fileSpanStartAndEndKeyIterator struct { + heap *fileHeap + allIters []backupManifestFileIterator + err error +} + +func newFileSpanStartAndEndKeyIterator( + backups []backuppb.BackupManifest, layerToIterFactory layerToBackupManifestFileIterFactory, +) (*fileSpanStartAndEndKeyIterator, error) { + it := &fileSpanStartAndEndKeyIterator{} + for layer := range backups { + iter, err := layerToIterFactory[layer]() + if err != nil { + return nil, err + } + + it.allIters = append(it.allIters, iter) + } + it.reset() + return it, nil +} + +func (i *fileSpanStartAndEndKeyIterator) next() { + if ok, _ := i.valid(); !ok { + return + } + + prevKey := i.value() + for i.heap.Len() > 0 { + minItem := heap.Pop(i.heap).(fileHeapItem) + + curKey := minItem.key() + if curKey.Compare(prevKey) > 0 { + heap.Push(i.heap, minItem) + break + } + + if minItem.cmpEndKey { + file, ok := minItem.fileIter.next() + if err := minItem.fileIter.err(); err != nil { + i.err = err + return + } + if ok { + minItem.cmpEndKey = false + minItem.file = file + heap.Push(i.heap, minItem) + } + } else { + minItem.cmpEndKey = true + heap.Push(i.heap, minItem) + } + } +} + +func (i *fileSpanStartAndEndKeyIterator) valid() (bool, error) { + if i.err != nil { + return false, i.err + } + return i.heap.Len() > 0, nil +} + +func (i *fileSpanStartAndEndKeyIterator) value() roachpb.Key { + if ok, _ := i.valid(); !ok { + return nil + } + + return i.heap.fileHeapItems[0].key() +} +func (i *fileSpanStartAndEndKeyIterator) reset() { + i.heap = &fileHeap{} + i.err = nil + + for _, iter := range i.allIters { + iter.reset() + + file, ok := iter.next() + if err := iter.err(); err != nil { + i.err = err + return + } + if ok { + i.heap.fileHeapItems = append(i.heap.fileHeapItems, fileHeapItem{ + fileIter: iter, + file: file, + cmpEndKey: false, + }) + } + } + heap.Init(i.heap) +} + +type fileHeapItem struct { + fileIter backupManifestFileIterator + file backuppb.BackupManifest_File + cmpEndKey bool +} + +func (f fileHeapItem) key() roachpb.Key { + if f.cmpEndKey { + return f.file.Span.EndKey + } + return f.file.Span.Key +} + +type fileHeap struct { + fileHeapItems []fileHeapItem +} + +func (f *fileHeap) Len() int { + return len(f.fileHeapItems) +} + +func (f *fileHeap) Less(i, j int) bool { + return f.fileHeapItems[i].key().Compare(f.fileHeapItems[j].key()) < 0 +} + +func (f *fileHeap) Swap(i, j int) { + f.fileHeapItems[i], f.fileHeapItems[j] = f.fileHeapItems[j], f.fileHeapItems[i] +} + +func (f *fileHeap) Push(x any) { + item, ok := x.(fileHeapItem) + if !ok { + panic("pushed value not fileHeapItem") + } + + f.fileHeapItems = append(f.fileHeapItems, item) +} + +func (f *fileHeap) Pop() any { + old := f.fileHeapItems + n := len(old) + item := old[n-1] + f.fileHeapItems = old[0 : n-1] + return item +} + +func getNewIntersectingFilesByLayer( + span roachpb.Span, layersCoveredLater map[int]bool, fileIters []backupManifestFileIterator, +) ([][]backuppb.BackupManifest_File, error) { + var files [][]backuppb.BackupManifest_File + + for l, iter := range fileIters { + var layerFiles []backuppb.BackupManifest_File + if !layersCoveredLater[l] { + for ; ; iter.next() { + f, ok := iter.peek() + if !ok { + break + } + + if span.Overlaps(f.Span) { + layerFiles = append(layerFiles, f) + } + + if span.EndKey.Compare(f.Span.Key) <= 0 { + break + } + } + if iter.err() != nil { + return nil, iter.err() + } + } + files = append(files, layerFiles) + } + + return files, nil +} diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index afa06a8474b5..36179e309fda 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -138,13 +139,12 @@ func MockBackupChain( // iterating through the partitions of the cover and removing that partition's // span from the group for every file specified by that partition, and then // checking that all the groups are empty, indicating no needed span was missed. -// It also checks that each file that the cover has an expected number of -// partitions (i.e. isn't just one big partition of all files), by comparing its -// length to the number of files a file's end key was greater than any prior end -// key when walking files in order by start key in the backups. This check is -// thus sensitive to ordering; the coverage correctness check however is not. // // The function also verifies that a cover does not cross a span boundary. +// +// TODO(rui): this check previously contained a partition count check. +// Partitions are now generated differently, so this is a reminder to add this +// check back in when I figure out what the expected partition count should be. func checkRestoreCovering( ctx context.Context, backups []backuppb.BackupManifest, @@ -153,7 +153,6 @@ func checkRestoreCovering( merged bool, storageFactory cloud.ExternalStorageFactory, ) error { - var expectedPartitions int required := make(map[string]*roachpb.SpanGroup) introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) @@ -194,7 +193,6 @@ func checkRestoreCovering( required[f.Path].Add(sp) if sp.EndKey.Compare(last) > 0 { last = sp.EndKey - expectedPartitions++ } } } @@ -224,18 +222,50 @@ func checkRestoreCovering( return errors.Errorf("file %s was supposed to cover span %s", name, missing) } } - if got := len(cov); got != expectedPartitions && !merged { - return errors.Errorf("expected %d partitions, got %d", expectedPartitions, got) - } return nil } const noSpanTargetSize = 0 +func makeImportSpans( + ctx context.Context, + spans []roachpb.Span, + backups []backuppb.BackupManifest, + layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + targetSize int64, + introducedSpanFrontier *spanUtils.Frontier, + useSimpleImportSpans bool, +) ([]execinfrapb.RestoreSpanEntry, error) { + var cover []execinfrapb.RestoreSpanEntry + spanCh := make(chan execinfrapb.RestoreSpanEntry) + g := ctxgroup.WithContext(context.Background()) + g.Go(func() error { + for entry := range spanCh { + cover = append(cover, entry) + } + return nil + }) + + err := generateAndSendImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, targetSize, spanCh, useSimpleImportSpans) + close(spanCh) + + if err != nil { + return nil, err + } + if err := g.Wait(); err != nil { + return nil, err + } + return cover, nil +} + func TestRestoreEntryCoverExample(t *testing.T) { defer leaktest.AfterTest(t)() + + const numAccounts = 1 ctx := context.Background() - tc, _, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 1, InitManualReplication) + + tc, _, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, + InitManualReplication) defer cleanupFn() sp := func(start, end string) roachpb.Span { @@ -278,23 +308,26 @@ func TestRestoreEntryCoverExample(t *testing.T) { layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, &execCfg, backups, nil, nil) require.NoError(t, err) - cover, err := makeSimpleImportSpans(spans, backups, layerToBackupManifestFileIterFactory, nil, - emptySpanFrontier, nil, noSpanTargetSize) + cover, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, noSpanTargetSize, emptySpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "c"), Files: paths("1", "4", "6")}, - {Span: sp("c", "e"), Files: paths("2", "4", "6")}, - {Span: sp("e", "f"), Files: paths("6")}, - {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, + {Span: sp("a", "b"), Files: paths("1", "6")}, + {Span: sp("b", "c"), Files: paths("1", "4", "6")}, + {Span: sp("c", "f"), Files: paths("2", "4", "6")}, + {Span: sp("f", "g"), Files: paths("6")}, + {Span: sp("g", "h"), Files: paths("5", "6")}, + {Span: sp("h", "i"), Files: paths("3", "5", "8")}, {Span: sp("l", "m"), Files: paths("9")}, }, cover) - coverSized, err := makeSimpleImportSpans(spans, backups, layerToBackupManifestFileIterFactory, - nil, emptySpanFrontier, nil, 2<<20) + coverSized, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, 2<<20, emptySpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "f"), Files: paths("1", "2", "4", "6")}, - {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, + {Span: sp("a", "b"), Files: paths("1", "6")}, + {Span: sp("b", "c"), Files: paths("1", "4", "6")}, + {Span: sp("c", "f"), Files: paths("2", "4", "6")}, + {Span: sp("f", "h"), Files: paths("5", "6")}, + {Span: sp("h", "i"), Files: paths("3", "5", "8")}, {Span: sp("l", "m"), Files: paths("9")}, }, coverSized) @@ -303,12 +336,13 @@ func TestRestoreEntryCoverExample(t *testing.T) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - coverIntroduced, err := makeSimpleImportSpans(spans, backups, layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, - noSpanTargetSize) + coverIntroduced, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, noSpanTargetSize, introducedSpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ {Span: sp("a", "f"), Files: paths("6")}, - {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, + {Span: sp("f", "g"), Files: paths("6")}, + {Span: sp("g", "h"), Files: paths("5", "6")}, + {Span: sp("h", "i"), Files: paths("3", "5", "8")}, {Span: sp("l", "m"), Files: paths("9")}, }, coverIntroduced) @@ -515,8 +549,8 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, &execCfg, backups, nil, nil) require.NoError(t, err) - cover, err := makeSimpleImportSpans(restoreSpans, backups, layerToBackupManifestFileIterFactory, - nil, introducedSpanFrontier, nil, 0) + cover, err := makeImportSpans(ctx, restoreSpans, backups, layerToBackupManifestFileIterFactory, + 0, introducedSpanFrontier, false) require.NoError(t, err) for _, reIntroTable := range reIntroducedTables { @@ -552,25 +586,26 @@ func TestRestoreEntryCover(t *testing.T) { for _, spans := range []int{1, 2, 3, 5, 9, 11, 12} { for _, files := range []int{0, 1, 2, 3, 4, 10, 12, 50} { for _, hasExternalFilesList := range []bool{true, false} { - backups, err := MockBackupChain(ctx, numBackups, spans, files, r, hasExternalFilesList, execCfg) - require.NoError(t, err) - - for _, target := range []int64{0, 1, 4, 100, 1000} { - t.Run(fmt.Sprintf("numBackups=%d, numSpans=%d, numFiles=%d, merge=%d, slim=%t", - numBackups, spans, files, target, hasExternalFilesList), func(t *testing.T) { - introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) - require.NoError(t, err) - - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, - &execCfg, backups, nil, nil) - require.NoError(t, err) - cover, err := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, - layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, - nil, target<<20) - require.NoError(t, err) - require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, - cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) - }) + for _, simpleImportSpans := range []bool{true, false} { + backups, err := MockBackupChain(ctx, numBackups, spans, files, r, hasExternalFilesList, execCfg) + require.NoError(t, err) + + for _, target := range []int64{0, 1, 4, 100, 1000} { + t.Run(fmt.Sprintf("numBackups=%d, numSpans=%d, numFiles=%d, merge=%d, slim=%t, simple=%t", + numBackups, spans, files, target, hasExternalFilesList, simpleImportSpans), func(t *testing.T) { + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + require.NoError(t, err) + + layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, + &execCfg, backups, nil, nil) + require.NoError(t, err) + cover, err := makeImportSpans(ctx, backups[numBackups-1].Spans, backups, + layerToBackupManifestFileIterFactory, target<<20, introducedSpanFrontier, simpleImportSpans) + require.NoError(t, err) + require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, + cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) + }) + } } } } diff --git a/pkg/sql/execinfrapb/api.go b/pkg/sql/execinfrapb/api.go index f87b44fff2ae..ecd89228bf4f 100644 --- a/pkg/sql/execinfrapb/api.go +++ b/pkg/sql/execinfrapb/api.go @@ -86,3 +86,7 @@ func (m *ChangeAggregatorSpec) User() username.SQLUsername { func (m *ChangeFrontierSpec) User() username.SQLUsername { return m.UserProto.Decode() } + +func (m *GenerativeSplitAndScatterSpec) User() username.SQLUsername { + return m.UserProto.Decode() +} diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 5ef0914881e4..f0ab562aec4d 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -609,6 +609,12 @@ func (s *HashGroupJoinerSpec) summary() (string, []string) { return "HashGroupJoiner", details } +// summary implements the diagramCellType interface. +func (g *GenerativeSplitAndScatterSpec) summary() (string, []string) { + detail := fmt.Sprintf("%d import spans", g.NumEntries) + return "GenerativeSplitAndScatterSpec", []string{detail} +} + type diagramCell struct { Title string `json:"title"` Details []string `json:"details"` diff --git a/pkg/sql/execinfrapb/processors.proto b/pkg/sql/execinfrapb/processors.proto index 8b02a86a5bb5..cb5a518701e7 100644 --- a/pkg/sql/execinfrapb/processors.proto +++ b/pkg/sql/execinfrapb/processors.proto @@ -123,6 +123,7 @@ message ProcessorCoreUnion { optional IndexBackfillMergerSpec indexBackfillMerger = 38; optional TTLSpec ttl = 39; optional HashGroupJoinerSpec hashGroupJoiner = 40; + optional GenerativeSplitAndScatterSpec generativeSplitAndScatter = 41; reserved 6, 12, 14, 17, 18, 19, 20; } diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index b42d523c6dc4..0eb6c97c0fd4 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -391,3 +391,33 @@ message IndexBackfillMergerSpec { // NEXT ID: 9. } + +message GenerativeSplitAndScatterSpec { + repeated TableRekey table_rekeys = 1 [(gogoproto.nullable) = false]; + repeated TenantRekey tenant_rekeys = 2 [(gogoproto.nullable) = false]; + optional bool validate_only = 3 [(gogoproto.nullable) = false]; + + // URIs is the URIs of the backup manifests. + repeated string uris = 4 [(gogoproto.customname) = "URIs"]; + optional jobs.jobspb.BackupEncryptionOptions encryption = 5; + + // EndTime is the time of the restore. + optional util.hlc.Timestamp endTime = 9 [(gogoproto.nullable) = false]; + // Spans is the required spans in the restore. + repeated roachpb.Span spans = 10 [(gogoproto.nullable) = false]; + repeated jobs.jobspb.RestoreDetails.BackupLocalityInfo backup_locality_info = 11 [(gogoproto.nullable) = false]; + // HighWater is the high watermark of the previous run of restore. + optional bytes high_water = 12 [(gogoproto.nullable) = false]; + // User who initiated the restore. + optional string user_proto = 13 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; + // ChunkSize is the number of import spans per chunk. + optional int64 chunk_size = 14[(gogoproto.nullable) = false]; + // TargetSize is the target size for each import span. + optional int64 target_size = 15[(gogoproto.nullable) = false]; + // NumEntries is the total number of import spans in this restore. + optional int64 num_entries = 16[(gogoproto.nullable) = false]; + // NumNodes is the number of nodes available for dist restore. + optional int64 num_nodes = 17[(gogoproto.nullable) = false]; + optional int64 job_id = 18 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"]; + optional bool use_simple_import_spans = 19 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 16f97ca0ed13..1ccf097d7e2f 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -376,6 +376,15 @@ func NewProcessor( } return newHashGroupJoiner(ctx, flowCtx, processorID, core.HashGroupJoiner, inputs[0], inputs[1], post, outputs[0]) } + if core.GenerativeSplitAndScatter != nil { + if err := checkNumInOut(inputs, outputs, 0, 1); err != nil { + return nil, err + } + if NewGenerativeSplitAndScatterProcessor == nil { + return nil, errors.New("GenerativeSplitAndScatter processor unimplemented") + } + return NewGenerativeSplitAndScatterProcessor(ctx, flowCtx, processorID, *core.GenerativeSplitAndScatter, post, outputs[0]) + } return nil, errors.Errorf("unsupported processor core %q", core) } @@ -411,3 +420,6 @@ var NewStreamIngestionFrontierProcessor func(context.Context, *execinfra.FlowCtx // NewTTLProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewTTLProcessor func(context.Context, *execinfra.FlowCtx, int32, execinfrapb.TTLSpec, execinfra.RowReceiver) (execinfra.Processor, error) + +// NewGenerativeSplitAndScatterProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. +var NewGenerativeSplitAndScatterProcessor func(context.Context, *execinfra.FlowCtx, int32, execinfrapb.GenerativeSplitAndScatterSpec, *execinfrapb.PostProcessSpec, execinfra.RowReceiver) (execinfra.Processor, error)