diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 8f0148efeedb..4c8af2228502 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "backup_telemetry.go", "create_scheduled_backup.go", "file_sst_sink.go", + "generative_split_and_scatter_processor.go", "key_rewriter.go", "restoration_data.go", "restore_data_processor.go", diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backup_metadata_test.go index c9338be5365c..955bbd7d3a0b 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backup_metadata_test.go @@ -187,18 +187,22 @@ func checkFiles( ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaFiles []backuppb.BackupManifest_File - var file backuppb.BackupManifest_File it, err := bm.NewFileIter(ctx) if err != nil { t.Fatal(err) } defer it.Close() - for it.Next(&file) { - metaFiles = append(metaFiles, file) - } - if it.Err() != nil { - t.Fatal(it.Err()) + for ; ; it.Next() { + ok, err := it.Valid() + if err != nil { + t.Fatal(err) + } + if !ok { + break + } + + metaFiles = append(metaFiles, *it.Value()) } require.Equal(t, m.Files, metaFiles) diff --git a/pkg/ccl/backupccl/backupinfo/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go index efc648336e9c..67adef4b782c 100644 --- a/pkg/ccl/backupccl/backupinfo/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -262,6 +262,14 @@ func writeDescsToMetadata( return nil } +func FileCmp(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) int { + if cmp := left.Span.Key.Compare(right.Span.Key); cmp != 0 { + return cmp + } + + return strings.Compare(left.Path, right.Path) +} + func writeFilesSST( ctx context.Context, m *backuppb.BackupManifest, @@ -280,8 +288,7 @@ func writeFilesSST( // Sort and write all of the files into a single file info SST. sort.Slice(m.Files, func(i, j int) bool { - cmp := m.Files[i].Span.Key.Compare(m.Files[j].Span.Key) - return cmp < 0 || (cmp == 0 && strings.Compare(m.Files[i].Path, m.Files[j].Path) < 0) + return FileCmp(m.Files[i], m.Files[j]) < 0 }) for i := range m.Files { @@ -1015,10 +1022,11 @@ func (si *SpanIterator) Next(span *roachpb.Span) bool { return false } -// FileIterator is a simple iterator to iterate over stats.TableStatisticProtos. +// FileIterator is a simple iterator to iterate over backuppb.BackupManifest_File. type FileIterator struct { mergedIterator storage.SimpleMVCCIterator err error + file *backuppb.BackupManifest_File } // NewFileIter creates a new FileIterator for the backup metadata. @@ -1076,40 +1084,51 @@ func (fi *FileIterator) Close() { fi.mergedIterator.Close() } -// Err returns the iterator's error. -func (fi *FileIterator) Err() error { - return fi.err -} - -// Next retrieves the next file in the iterator. -// -// Next returns true if next element was successfully unmarshalled into file, -// and false if there are no more elements or if an error was encountered. When -// Next returns false, the user should call the Err method to verify the -// existence of an error. -func (fi *FileIterator) Next(file *backuppb.BackupManifest_File) bool { +// Valid indicates whether or not the iterator is pointing to a valid value. +func (fi *FileIterator) Valid() (bool, error) { if fi.err != nil { - return false + return false, fi.err } - valid, err := fi.mergedIterator.Valid() - if err != nil || !valid { - fi.err = err - return false - } - v, err := fi.mergedIterator.UnsafeValue() - if err != nil { + if ok, err := fi.mergedIterator.Valid(); !ok { fi.err = err - return false + return ok, err } - err = protoutil.Unmarshal(v, file) - if err != nil { - fi.err = err - return false + + if fi.file == nil { + v, err := fi.mergedIterator.UnsafeValue() + if err != nil { + fi.err = err + return false, fi.err + } + + file := &backuppb.BackupManifest_File{} + err = protoutil.Unmarshal(v, file) + if err != nil { + fi.err = err + return false, fi.err + } + fi.file = file } + return true, nil +} + +// Value returns the current value of the iterator, if valid. +func (fi *FileIterator) Value() *backuppb.BackupManifest_File { + return fi.file +} +// Next advances the iterator the the next value. +func (fi *FileIterator) Next() { fi.mergedIterator.Next() - return true + fi.file = nil +} + +// Reset resets the iterator to the first value. +func (fi *FileIterator) Reset() { + fi.mergedIterator.SeekGE(storage.MVCCKey{}) + fi.err = nil + fi.file = nil } // DescIterator is a simple iterator to iterate over descpb.Descriptors. diff --git a/pkg/ccl/backupccl/backuprand/backup_rand_test.go b/pkg/ccl/backupccl/backuprand/backup_rand_test.go index e462443a47e3..8dd6ccc0fc01 100644 --- a/pkg/ccl/backupccl/backuprand/backup_rand_test.go +++ b/pkg/ccl/backupccl/backuprand/backup_rand_test.go @@ -33,7 +33,9 @@ import ( // randomly generated tables and verifies their data and schema are preserved. // It tests that full database backup as well as all subsets of per-table backup // roundtrip properly. 50% of the time, the test runs the restore with the -// schema_only parameter, which does not restore any rows from user tables. +// schema_only parameter, which does not restore any rows from user tables. The +// test will also run with bulkio.restore.use_simple_import_spans set to true +// 50% of the time. func TestBackupRestoreRandomDataRoundtrips(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -72,6 +74,10 @@ func TestBackupRestoreRandomDataRoundtrips(t *testing.T) { runSchemaOnlyExtension = ", schema_only" } + if rng.Intn(2) == 0 { + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.use_simple_import_spans = true") + } + tables := sqlDB.Query(t, `SELECT name FROM crdb_internal.tables WHERE database_name = 'rand' AND schema_name = 'public'`) var tableNames []string diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index 5ecf2fb4aee9..a832efe65bb0 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -14,6 +14,8 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/stretchr/testify/require" @@ -27,6 +29,7 @@ func BenchmarkCoverageChecks(b *testing.B) { r, _ := randutil.NewTestRand() for _, numBackups := range []int{1, 7, 24, 24 * 4} { + numBackups := numBackups b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) { for _, numSpans := range []int{10, 20, 100} { b.Run(fmt.Sprintf("numSpans=%d", numSpans), func(b *testing.B) { @@ -61,6 +64,7 @@ func BenchmarkRestoreEntryCover(b *testing.B) { ctx := context.Background() r, _ := randutil.NewTestRand() for _, numBackups := range []int{1, 2, 24, 24 * 4} { + numBackups := numBackups b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) { for _, baseFiles := range []int{0, 100, 10000} { b.Run(fmt.Sprintf("numFiles=%d", baseFiles), func(b *testing.B) { @@ -82,10 +86,22 @@ func BenchmarkRestoreEntryCover(b *testing.B) { layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, &execCfg, backups, nil, nil) require.NoError(b, err) - cov, err := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, - layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, - nil, 0) - require.NoError(b, err) + + spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + defer close(spanCh) + return generateAndSendImportSpans(ctx, backups[numBackups-1].Spans, backups, + layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, 0, spanCh, false) + }) + + var cov []execinfrapb.RestoreSpanEntry + for entry := range spanCh { + cov = append(cov, entry) + } + + require.NoError(b, g.Wait()) b.ReportMetric(float64(len(cov)), "coverSize") } }) diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go new file mode 100644 index 000000000000..bffa2482329e --- /dev/null +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -0,0 +1,384 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowexec" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/logtags" +) + +const generativeSplitAndScatterProcessorName = "generativeSplitAndScatter" + +var generativeSplitAndScatterOutputTypes = []*types.T{ + types.Bytes, // Span key for the range router + types.Bytes, // RestoreDataEntry bytes +} + +// generativeSplitAndScatterProcessor is given a backup chain, whose manifests +// are specified in URIs and iteratively generates RestoreSpanEntries to be +// distributed across the cluster. Depending on which node the span ends up on, +// it forwards RestoreSpanEntry as bytes along with the key of the span on a +// row. It expects an output RangeRouter and before it emits each row, it +// updates the entry in the RangeRouter's map with the destination of the +// scatter. +type generativeSplitAndScatterProcessor struct { + execinfra.ProcessorBase + + flowCtx *execinfra.FlowCtx + spec execinfrapb.GenerativeSplitAndScatterSpec + output execinfra.RowReceiver + + scatterer splitAndScatterer + // cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for + // it to finish. + cancelScatterAndWaitForWorker func() + + doneScatterCh chan entryNode + // A cache for routing datums, so only 1 is allocated per node. + routingDatumCache map[roachpb.NodeID]rowenc.EncDatum + scatterErr error +} + +var _ execinfra.Processor = &generativeSplitAndScatterProcessor{} + +func newGenerativeSplitAndScatterProcessor( + ctx context.Context, + flowCtx *execinfra.FlowCtx, + processorID int32, + spec execinfrapb.GenerativeSplitAndScatterSpec, + post *execinfrapb.PostProcessSpec, + output execinfra.RowReceiver, +) (execinfra.Processor, error) { + + db := flowCtx.Cfg.DB + kr, err := MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.TableRekeys, spec.TenantRekeys, + false /* restoreTenantFromStream */) + if err != nil { + return nil, err + } + + scatterer := makeSplitAndScatterer(db.KV(), kr) + if spec.ValidateOnly { + nodeID, _ := flowCtx.NodeID.OptionalNodeID() + scatterer = noopSplitAndScatterer{nodeID} + } + ssp := &generativeSplitAndScatterProcessor{ + flowCtx: flowCtx, + spec: spec, + output: output, + scatterer: scatterer, + // Large enough so that it never blocks. + doneScatterCh: make(chan entryNode, spec.NumEntries), + routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum), + } + if err := ssp.Init(ctx, ssp, post, generativeSplitAndScatterOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ + execinfra.ProcStateOpts{ + InputsToDrain: nil, // there are no inputs to drain + TrailingMetaCallback: func() []execinfrapb.ProducerMetadata { + ssp.close() + return nil + }, + }); err != nil { + return nil, err + } + return ssp, nil +} + +// Start is part of the RowSource interface. +func (gssp *generativeSplitAndScatterProcessor) Start(ctx context.Context) { + ctx = logtags.AddTag(ctx, "job", gssp.spec.JobID) + ctx = gssp.StartInternal(ctx, generativeSplitAndScatterProcessorName) + // Note that the loop over doneScatterCh in Next should prevent the goroutine + // below from leaking when there are no errors. However, if that loop needs to + // exit early, runSplitAndScatter's context will be canceled. + scatterCtx, cancel := context.WithCancel(ctx) + workerDone := make(chan struct{}) + gssp.cancelScatterAndWaitForWorker = func() { + cancel() + <-workerDone + } + if err := gssp.flowCtx.Stopper().RunAsyncTaskEx(scatterCtx, stop.TaskOpts{ + TaskName: "generativeSplitAndScatter-worker", + SpanOpt: stop.ChildSpan, + }, func(ctx context.Context) { + gssp.scatterErr = runGenerativeSplitAndScatter(scatterCtx, gssp.flowCtx, &gssp.spec, gssp.scatterer, gssp.doneScatterCh) + cancel() + close(gssp.doneScatterCh) + close(workerDone) + }); err != nil { + gssp.scatterErr = err + cancel() + close(workerDone) + } +} + +// Next implements the execinfra.RowSource interface. +func (gssp *generativeSplitAndScatterProcessor) Next() ( + rowenc.EncDatumRow, + *execinfrapb.ProducerMetadata, +) { + if gssp.State != execinfra.StateRunning { + return nil, gssp.DrainHelper() + } + + scatteredEntry, ok := <-gssp.doneScatterCh + if ok { + entry := scatteredEntry.entry + entryBytes, err := protoutil.Marshal(&entry) + if err != nil { + gssp.MoveToDraining(err) + return nil, gssp.DrainHelper() + } + + // The routing datums informs the router which output stream should be used. + routingDatum, ok := gssp.routingDatumCache[scatteredEntry.node] + if !ok { + routingDatum, _ = routingDatumsForSQLInstance(base.SQLInstanceID(scatteredEntry.node)) + gssp.routingDatumCache[scatteredEntry.node] = routingDatum + } + + row := rowenc.EncDatumRow{ + routingDatum, + rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(entryBytes))), + } + return row, nil + } + + if gssp.scatterErr != nil { + gssp.MoveToDraining(gssp.scatterErr) + return nil, gssp.DrainHelper() + } + + gssp.MoveToDraining(nil /* error */) + return nil, gssp.DrainHelper() +} + +// ConsumerClosed is part of the RowSource interface. +func (gssp *generativeSplitAndScatterProcessor) ConsumerClosed() { + // The consumer is done, Next() will not be called again. + gssp.close() +} + +// close stops the production workers. This needs to be called if the consumer +// runs into an error and stops consuming scattered entries to make sure we +// don't leak goroutines. +func (gssp *generativeSplitAndScatterProcessor) close() { + gssp.cancelScatterAndWaitForWorker() + gssp.InternalClose() +} + +func makeBackupMetadata( + ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.GenerativeSplitAndScatterSpec, +) ([]backuppb.BackupManifest, layerToBackupManifestFileIterFactory, error) { + + execCfg := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig) + + kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig, + execCfg.InternalDB, spec.User()) + + backupManifests, _, err := backupinfo.LoadBackupManifestsAtTime(ctx, nil, spec.URIs, + spec.User(), execCfg.DistSQLSrv.ExternalStorageFromURI, spec.Encryption, &kmsEnv, spec.EndTime) + if err != nil { + return nil, nil, err + } + + layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, execCfg, + backupManifests, spec.Encryption, &kmsEnv) + if err != nil { + return nil, nil, err + } + + return backupManifests, layerToBackupManifestFileIterFactory, nil +} + +type restoreEntryChunk struct { + entries []execinfrapb.RestoreSpanEntry + splitKey roachpb.Key +} + +func runGenerativeSplitAndScatter( + ctx context.Context, + flowCtx *execinfra.FlowCtx, + spec *execinfrapb.GenerativeSplitAndScatterSpec, + scatterer splitAndScatterer, + doneScatterCh chan<- entryNode, +) error { + log.Infof(ctx, "Running generative split and scatter with %d total spans, %d chunk size, %d nodes", + spec.NumEntries, spec.ChunkSize, spec.NumNodes) + g := ctxgroup.WithContext(ctx) + + splitWorkers := int(spec.NumNodes) + restoreSpanEntriesCh := make(chan execinfrapb.RestoreSpanEntry, splitWorkers*int(spec.ChunkSize)) + g.GoCtx(func(ctx context.Context) error { + defer close(restoreSpanEntriesCh) + + backups, layerToFileIterFactory, err := makeBackupMetadata(ctx, + flowCtx, spec) + if err != nil { + return err + } + + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, spec.EndTime) + if err != nil { + return err + } + + backupLocalityMap, err := makeBackupLocalityMap(spec.BackupLocalityInfo, spec.User()) + if err != nil { + return err + } + + return generateAndSendImportSpans( + ctx, + spec.Spans, + backups, + layerToFileIterFactory, + backupLocalityMap, + introducedSpanFrontier, + spec.HighWater, + spec.TargetSize, + restoreSpanEntriesCh, + spec.UseSimpleImportSpans, + ) + }) + + restoreEntryChunksCh := make(chan restoreEntryChunk, splitWorkers) + g.GoCtx(func(ctx context.Context) error { + defer close(restoreEntryChunksCh) + + var idx int64 + var chunk restoreEntryChunk + for entry := range restoreSpanEntriesCh { + entry.ProgressIdx = idx + idx++ + if len(chunk.entries) == int(spec.ChunkSize) { + chunk.splitKey = entry.Span.Key + restoreEntryChunksCh <- chunk + chunk = restoreEntryChunk{} + } + chunk.entries = append(chunk.entries, entry) + } + + if len(chunk.entries) > 0 { + restoreEntryChunksCh <- chunk + } + return nil + }) + + importSpanChunksCh := make(chan scatteredChunk, splitWorkers*2) + g2 := ctxgroup.WithContext(ctx) + for worker := 0; worker < splitWorkers; worker++ { + g2.GoCtx(func(ctx context.Context) error { + // Chunks' leaseholders should be randomly placed throughout the + // cluster. + for importSpanChunk := range restoreEntryChunksCh { + scatterKey := importSpanChunk.entries[0].Span.Key + if !importSpanChunk.splitKey.Equal(roachpb.Key{}) { + // Split at the start of the next chunk, to partition off a + // prefix of the space to scatter. + if err := scatterer.split(ctx, flowCtx.Codec(), importSpanChunk.splitKey); err != nil { + return err + } + } + chunkDestination, err := scatterer.scatter(ctx, flowCtx.Codec(), scatterKey) + if err != nil { + return err + } + if chunkDestination == 0 { + // If scatter failed to find a node for range ingestion, route the range + // to the node currently running the split and scatter processor. + if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); ok { + chunkDestination = nodeID + log.Warningf(ctx, "scatter returned node 0. "+ + "Route span starting at %s to current node %v", scatterKey, nodeID) + } else { + log.Warningf(ctx, "scatter returned node 0. "+ + "Route span starting at %s to default stream", scatterKey) + } + } + + sc := scatteredChunk{ + destination: chunkDestination, + entries: importSpanChunk.entries, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case importSpanChunksCh <- sc: + } + } + return nil + }) + } + + g.GoCtx(func(ctx context.Context) error { + defer close(importSpanChunksCh) + return g2.Wait() + }) + + // TODO(pbardea): This tries to cover for a bad scatter by having 2 * the + // number of nodes in the cluster. Is it necessary? + splitScatterWorkers := 2 * splitWorkers + for worker := 0; worker < splitScatterWorkers; worker++ { + g.GoCtx(func(ctx context.Context) error { + for importSpanChunk := range importSpanChunksCh { + chunkDestination := importSpanChunk.destination + for i, importEntry := range importSpanChunk.entries { + nextChunkIdx := i + 1 + + log.VInfof(ctx, 2, "processing a span [%s,%s)", importEntry.Span.Key, importEntry.Span.EndKey) + var splitKey roachpb.Key + if nextChunkIdx < len(importSpanChunk.entries) { + // Split at the next entry. + splitKey = importSpanChunk.entries[nextChunkIdx].Span.Key + if err := scatterer.split(ctx, flowCtx.Codec(), splitKey); err != nil { + return err + } + } + + scatteredEntry := entryNode{ + entry: importEntry, + node: chunkDestination, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case doneScatterCh <- scatteredEntry: + } + } + } + return nil + }) + } + + return g.Wait() +} + +func init() { + rowexec.NewGenerativeSplitAndScatterProcessor = newGenerativeSplitAndScatterProcessor +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 863b6d193365..ce93e174359d 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -77,6 +78,13 @@ import ( // tables we process in a single txn when restoring their table statistics. var restoreStatsInsertBatchSize = 10 +var useSimpleImportSpans = settings.RegisterBoolSetting( + settings.TenantWritable, + "bulkio.restore.use_simple_import_spans", + "if set to true, restore will generate its import spans using the makeSimpleImportSpans algorithm", + false, +) + // rewriteBackupSpanKey rewrites a backup span start key for the purposes of // splitting up the target key-space to send out the actual work of restoring. // @@ -257,15 +265,6 @@ func restore( return emptyRowCount, nil } - mu := struct { - syncutil.Mutex - highWaterMark int - res roachpb.RowCount - requestsCompleted []bool - }{ - highWaterMark: -1, - } - backupLocalityMap, err := makeBackupLocalityMap(backupLocalityInfo, user) if err != nil { return emptyRowCount, errors.Wrap(err, "resolving locality locations") @@ -289,53 +288,65 @@ func restore( if err != nil { return emptyRowCount, err } - importSpans, err := makeSimpleImportSpans( - dataToRestore.getSpans(), - backupManifests, - layerToBackupManifestFileIterFactory, - backupLocalityMap, - introducedSpanFrontier, - highWaterMark, - targetRestoreSpanSize.Get(execCtx.ExecCfg().SV())) - if err != nil { - return emptyRowCount, err - } - if len(importSpans) == 0 { - // There are no files to restore. - return emptyRowCount, nil - } + simpleImportSpans := useSimpleImportSpans.Get(&execCtx.ExecCfg().Settings.SV) - for i := range importSpans { - importSpans[i].ProgressIdx = int64(i) + mu := struct { + syncutil.Mutex + highWaterMark int64 + ceiling int64 + res roachpb.RowCount + // As part of job progress tracking, inFlightImportSpans tracks all the + // spans that have been generated are being processed by the processors in + // distRestore. requestsCompleleted tracks the spans from + // inFlightImportSpans that have completed its processing. Once all spans up + // to index N have been processed (and appear in requestsCompleted), then + // any spans with index < N will be removed from both inFlightImportSpans + // and requestsCompleted maps. + inFlightImportSpans map[int64]roachpb.Span + requestsCompleted map[int64]bool + }{ + highWaterMark: -1, + ceiling: 0, + inFlightImportSpans: make(map[int64]roachpb.Span), + requestsCompleted: make(map[int64]bool), + } + + targetSize := targetRestoreSpanSize.Get(&execCtx.ExecCfg().Settings.SV) + importSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000) + genSpan := func(ctx context.Context) error { + defer close(importSpanCh) + return generateAndSendImportSpans( + restoreCtx, + dataToRestore.getSpans(), + backupManifests, + layerToBackupManifestFileIterFactory, + backupLocalityMap, + introducedSpanFrontier, + highWaterMark, + targetSize, + importSpanCh, + simpleImportSpans, + ) } - mu.requestsCompleted = make([]bool, len(importSpans)) - // TODO(pbardea): This not super principled. I just wanted something that - // wasn't a constant and grew slower than linear with the length of - // importSpans. It seems to be working well for BenchmarkRestore2TB but - // worth revisiting. - // It tries to take the cluster size into account so that larger clusters - // distribute more chunks amongst them so that after scattering there isn't - // a large varience in the distribution of entries. - chunkSize := int(math.Sqrt(float64(len(importSpans)))) / numNodes - if chunkSize == 0 { - chunkSize = 1 - } - importSpanChunks := make([][]execinfrapb.RestoreSpanEntry, 0, len(importSpans)/chunkSize) - for start := 0; start < len(importSpans); { - importSpanChunk := importSpans[start:] - end := start + chunkSize - if end < len(importSpans) { - importSpanChunk = importSpans[start:end] + // Count number of import spans. + var numImportSpans int + var countTasks []func(ctx context.Context) error + log.Infof(restoreCtx, "rh_debug: starting count task") + spanCountTask := func(ctx context.Context) error { + for range importSpanCh { + numImportSpans++ } - importSpanChunks = append(importSpanChunks, importSpanChunk) - start = end + return nil + } + countTasks = append(countTasks, genSpan, spanCountTask) + if err := ctxgroup.GoAndWait(restoreCtx, countTasks...); err != nil { + return emptyRowCount, errors.Wrapf(err, "counting number of import spans") } - requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block - progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) - + importSpanCh = make(chan execinfrapb.RestoreSpanEntry, 1000) + requestFinishedCh := make(chan struct{}, numImportSpans) // enough buffer to never block // tasks are the concurrent tasks that are run during the restore. var tasks []func(ctx context.Context) error if dataToRestore.isMainBundle() { @@ -344,13 +355,13 @@ func restore( // cluster restores) may be restored first. When restoring that data, we // don't want to update the high-water mark key, so instead progress is just // defined on the main data bundle (of which there should only be one). - progressLogger := jobs.NewChunkProgressLogger(job, len(importSpans), job.FractionCompleted(), + progressLogger := jobs.NewChunkProgressLogger(job, numImportSpans, job.FractionCompleted(), func(progressedCtx context.Context, details jobspb.ProgressDetails) { switch d := details.(type) { case *jobspb.Progress_Restore: mu.Lock() if mu.highWaterMark >= 0 { - d.Restore.HighWater = importSpans[mu.highWaterMark].Span.Key + d.Restore.HighWater = mu.inFlightImportSpans[mu.highWaterMark].Key } mu.Unlock() default: @@ -366,10 +377,10 @@ func restore( tasks = append(tasks, jobProgressLoop) } - jobCheckpointLoop := func(ctx context.Context) error { + progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) + + generativeCheckpointLoop := func(ctx context.Context) error { defer close(requestFinishedCh) - // When a processor is done importing a span, it will send a progress update - // to progCh. for progress := range progCh { mu.Lock() var progDetails backuppb.RestoreProgress @@ -380,16 +391,32 @@ func restore( mu.res.Add(progDetails.Summary) idx := progDetails.ProgressIdx - // Assert that we're actually marking the correct span done. See #23977. - if !importSpans[progDetails.ProgressIdx].Span.Key.Equal(progDetails.DataSpan.Key) { - mu.Unlock() - return errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, importSpans[idx], - ) + if idx >= mu.ceiling { + for i := mu.ceiling; i <= idx; i++ { + importSpan := <-importSpanCh + mu.inFlightImportSpans[i] = importSpan.Span + } + mu.ceiling = idx + 1 } - mu.requestsCompleted[idx] = true - for j := mu.highWaterMark + 1; j < len(mu.requestsCompleted) && mu.requestsCompleted[j]; j++ { - mu.highWaterMark = j + + if sp, ok := mu.inFlightImportSpans[idx]; ok { + // Assert that we're actually marking the correct span done. See #23977. + if !sp.Key.Equal(progDetails.DataSpan.Key) { + mu.Unlock() + return errors.Newf("request %d for span %v does not match import span for same idx: %v", + idx, progDetails.DataSpan, sp, + ) + } + mu.requestsCompleted[idx] = true + prevHighWater := mu.highWaterMark + for j := mu.highWaterMark + 1; j < mu.ceiling && mu.requestsCompleted[j]; j++ { + mu.highWaterMark = j + } + + for j := prevHighWater; j < mu.highWaterMark; j++ { + delete(mu.requestsCompleted, j) + delete(mu.inFlightImportSpans, j) + } } mu.Unlock() @@ -399,14 +426,13 @@ func restore( } return nil } - tasks = append(tasks, jobCheckpointLoop) + tasks = append(tasks, generativeCheckpointLoop, genSpan) runRestore := func(ctx context.Context) error { return distRestore( ctx, execCtx, int64(job.ID()), - importSpanChunks, dataToRestore.getPKIDs(), encryption, kmsEnv, @@ -414,6 +440,14 @@ func restore( dataToRestore.getTenantRekeys(), endTime, dataToRestore.isValidateOnly(), + details.URIs, + dataToRestore.getSpans(), + backupLocalityInfo, + highWaterMark, + targetSize, + numNodes, + numImportSpans, + simpleImportSpans, progCh, ) } @@ -423,7 +457,7 @@ func restore( // This leaves the data that did get imported in case the user wants to // retry. // TODO(dan): Build tooling to allow a user to restart a failed restore. - return emptyRowCount, errors.Wrapf(err, "importing %d ranges", len(importSpans)) + return emptyRowCount, errors.Wrapf(err, "importing %d ranges", numImportSpans) } return mu.res, nil diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 225949930cd2..08ac4f069f45 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -11,6 +11,7 @@ package backupccl import ( "bytes" "context" + "math" "sort" "time" @@ -60,7 +61,6 @@ func distRestore( ctx context.Context, execCtx sql.JobExecContext, jobID int64, - chunks [][]execinfrapb.RestoreSpanEntry, pkIDs map[uint64]bool, encryption *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, @@ -68,6 +68,14 @@ func distRestore( tenantRekeys []execinfrapb.TenantRekey, restoreTime hlc.Timestamp, validateOnly bool, + uris []string, + requiredSpans []roachpb.Span, + backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, + lowWaterMark roachpb.Key, + targetSize int64, + numNodes int, + numImportSpans int, + useSimpleImportSpans bool, progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, ) error { defer close(progCh) @@ -107,12 +115,6 @@ func distRestore( p := planCtx.NewPhysicalPlan() - splitAndScatterSpecs, err := makeSplitAndScatterSpecs(sqlInstanceIDs, chunks, tableRekeys, - tenantRekeys, validateOnly) - if err != nil { - return nil, nil, err - } - restoreDataSpec := execinfrapb.RestoreDataSpec{ JobID: jobID, RestoreTime: restoreTime, @@ -123,12 +125,6 @@ func distRestore( ValidateOnly: validateOnly, } - if len(splitAndScatterSpecs) == 0 { - // We should return an error here as there are no nodes that are compatible, - // but we should have at least found ourselves. - return nil, nil, errors.AssertionFailedf("no compatible nodes") - } - // Plan SplitAndScatter in a round-robin fashion. splitAndScatterStageID := p.NewStageOnNodes(sqlInstanceIDs) splitAndScatterProcs := make(map[base.SQLInstanceID]physicalplan.ProcessorIdx) @@ -162,33 +158,55 @@ func distRestore( return bytes.Compare(rangeRouterSpec.Spans[i].Start, rangeRouterSpec.Spans[j].Start) == -1 }) - for _, n := range sqlInstanceIDs { - spec := splitAndScatterSpecs[n] - if spec == nil { - // We may have fewer chunks than we have nodes for very small imports. In - // this case we only want to plan splitAndScatter nodes on a subset of - // nodes. Note that we still want to plan a RestoreData processor on every - // node since each entry could be scattered anywhere. - continue - } - proc := physicalplan.Processor{ - SQLInstanceID: n, - Spec: execinfrapb.ProcessorSpec{ - Core: execinfrapb.ProcessorCoreUnion{SplitAndScatter: splitAndScatterSpecs[n]}, - Post: execinfrapb.PostProcessSpec{}, - Output: []execinfrapb.OutputRouterSpec{ - { - Type: execinfrapb.OutputRouterSpec_BY_RANGE, - RangeRouterSpec: rangeRouterSpec, - }, + // TODO(pbardea): This not super principled. I just wanted something that + // wasn't a constant and grew slower than linear with the length of + // importSpans. It seems to be working well for BenchmarkRestore2TB but + // worth revisiting. + // It tries to take the cluster size into account so that larger clusters + // distribute more chunks amongst them so that after scattering there isn't + // a large varience in the distribution of entries. + chunkSize := int(math.Sqrt(float64(numImportSpans))) / numNodes + if chunkSize == 0 { + chunkSize = 1 + } + + id := execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID() + + spec := &execinfrapb.GenerativeSplitAndScatterSpec{ + TableRekeys: tableRekeys, + TenantRekeys: tenantRekeys, + ValidateOnly: validateOnly, + URIs: uris, + Encryption: encryption, + EndTime: restoreTime, + Spans: requiredSpans, + BackupLocalityInfo: backupLocalityInfo, + HighWater: lowWaterMark, + UserProto: execCtx.User().EncodeProto(), + TargetSize: targetSize, + ChunkSize: int64(chunkSize), + NumEntries: int64(numImportSpans), + NumNodes: int64(numNodes), + UseSimpleImportSpans: useSimpleImportSpans, + } + + proc := physicalplan.Processor{ + SQLInstanceID: id, + Spec: execinfrapb.ProcessorSpec{ + Core: execinfrapb.ProcessorCoreUnion{GenerativeSplitAndScatter: spec}, + Post: execinfrapb.PostProcessSpec{}, + Output: []execinfrapb.OutputRouterSpec{ + { + Type: execinfrapb.OutputRouterSpec_BY_RANGE, + RangeRouterSpec: rangeRouterSpec, }, - StageID: splitAndScatterStageID, - ResultTypes: splitAndScatterOutputTypes, }, - } - pIdx := p.AddProcessor(proc) - splitAndScatterProcs[n] = pIdx + StageID: splitAndScatterStageID, + ResultTypes: splitAndScatterOutputTypes, + }, } + pIdx := p.AddProcessor(proc) + splitAndScatterProcs[id] = pIdx // Plan RestoreData. restoreDataStageID := p.NewStageOnNodes(sqlInstanceIDs) @@ -284,34 +302,3 @@ func distRestore( return g.Wait() } - -// makeSplitAndScatterSpecs returns a map from nodeID to the SplitAndScatter -// spec that should be planned on that node. Given the chunks of ranges to -// import it round-robin distributes the chunks amongst the given nodes. -func makeSplitAndScatterSpecs( - sqlInstanceIDs []base.SQLInstanceID, - chunks [][]execinfrapb.RestoreSpanEntry, - tableRekeys []execinfrapb.TableRekey, - tenantRekeys []execinfrapb.TenantRekey, - validateOnly bool, -) (map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec, error) { - specsBySQLInstanceID := make(map[base.SQLInstanceID]*execinfrapb.SplitAndScatterSpec) - for i, chunk := range chunks { - sqlInstanceID := sqlInstanceIDs[i%len(sqlInstanceIDs)] - if spec, ok := specsBySQLInstanceID[sqlInstanceID]; ok { - spec.Chunks = append(spec.Chunks, execinfrapb.SplitAndScatterSpec_RestoreEntryChunk{ - Entries: chunk, - }) - } else { - specsBySQLInstanceID[sqlInstanceID] = &execinfrapb.SplitAndScatterSpec{ - Chunks: []execinfrapb.SplitAndScatterSpec_RestoreEntryChunk{{ - Entries: chunk, - }}, - TableRekeys: tableRekeys, - TenantRekeys: tenantRekeys, - ValidateOnly: validateOnly, - } - } - } - return specsBySQLInstanceID, nil -} diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index fb53655a2a98..b4abf18fa247 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -9,6 +9,7 @@ package backupccl import ( + "container/heap" "context" "sort" @@ -64,8 +65,10 @@ var targetRestoreSpanSize = settings.RegisterByteSizeSetting( // the `BackupManifest_Files` field of a manifest. type backupManifestFileIterator interface { next() (backuppb.BackupManifest_File, bool) + peek() (backuppb.BackupManifest_File, bool) err() error close() + reset() } // inMemoryFileIterator iterates over the `BackupManifest_Files` field stored @@ -76,11 +79,16 @@ type inMemoryFileIterator struct { } func (i *inMemoryFileIterator) next() (backuppb.BackupManifest_File, bool) { + f, hasNext := i.peek() + i.curIdx++ + return f, hasNext +} + +func (i *inMemoryFileIterator) peek() (backuppb.BackupManifest_File, bool) { if i.curIdx >= len(i.manifest.Files) { return backuppb.BackupManifest_File{}, false } f := i.manifest.Files[i.curIdx] - i.curIdx++ return f, true } @@ -90,6 +98,10 @@ func (i *inMemoryFileIterator) err() error { func (i *inMemoryFileIterator) close() {} +func (i *inMemoryFileIterator) reset() { + i.curIdx = 0 +} + var _ backupManifestFileIterator = &inMemoryFileIterator{} // makeBackupManifestFileIterator returns a backupManifestFileIterator that can @@ -138,19 +150,34 @@ type sstFileIterator struct { } func (s *sstFileIterator) next() (backuppb.BackupManifest_File, bool) { - var file backuppb.BackupManifest_File - hasNext := s.fi.Next(&file) - return file, hasNext + f, ok := s.peek() + if ok { + s.fi.Next() + } + + return f, ok +} + +func (s *sstFileIterator) peek() (backuppb.BackupManifest_File, bool) { + if ok, _ := s.fi.Valid(); !ok { + return backuppb.BackupManifest_File{}, false + } + return *s.fi.Value(), true } func (s *sstFileIterator) err() error { - return s.fi.Err() + _, err := s.fi.Valid() + return err } func (s *sstFileIterator) close() { s.fi.Close() } +func (s *sstFileIterator) reset() { + s.fi.Reset() +} + var _ backupManifestFileIterator = &sstFileIterator{} // makeSimpleImportSpans partitions the spans of requiredSpans into a covering @@ -382,3 +409,449 @@ func getBackupManifestFileIters( return layerToFileIterFactory, nil } + +// generateAndSendImportSpans partitions the spans of requiredSpans into a +// covering of RestoreSpanEntry's which each have all overlapping files from the +// passed backups assigned to them. The spans of requiredSpans are +// trimmed/removed based on the lowWaterMark before the covering for them is +// generated. These spans are generated one at a time and then sent to spanCh. +// +// Consider a chain of backups with files f1, f2… which cover spans as follows: +// +// backup +// 0| a___1___c c__2__e h__3__i +// 1| b___4___d g____5___i +// 2| a___________6______________h j_7_k +// 3| h_8_i l_9_m +// keys--a---b---c---d---e---f---g---h----i---j---k---l----m------p----> +// +// spans: |-------span1-------||---span2---| |---span3---| +// +// The cover for those spans would look like: +// +// [a, b): 1, 6 +// [b, c): 1, 4, 6 +// [c, f): 2, 4, 6 +// [f, g): 6 +// [g, h): 5, 6 +// [h, i): 3, 5, 8 +// [l, m): 9 +// +// This cover is created by iterating through the start and end keys of all the +// files in the backup in key order via fileSpanStartAndEndKeyIterator. The +// cover spans are initially just the spans between each pair of adjacent keys +// yielded by the iterator. We then iterate through each cover span and find all +// the overlapping files. If the files that overlap a cover span is a subset of +// the files that overlap the span before it, then the two spans are merged into +// a single span in the final cover. Additionally, if targetSize > 0, we can +// merge the current cover span with the previous cover span if the merged set +// of files have a total data size below the target size. +// +// The above example is tested in TestRestoreEntryCoverExample. +// +// If useSimpleImportSpans is true, the above covering method is not used and +// the covering is created by makeSimpleImportSpans instead. +func generateAndSendImportSpans( + ctx context.Context, + requiredSpans roachpb.Spans, + backups []backuppb.BackupManifest, + layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + backupLocalityMap map[int]storeByLocalityKV, + introducedSpanFrontier *spanUtils.Frontier, + lowWaterMark roachpb.Key, + targetSize int64, + spanCh chan execinfrapb.RestoreSpanEntry, + useSimpleImportSpans bool, +) error { + if useSimpleImportSpans { + importSpans, err := makeSimpleImportSpans(requiredSpans, backups, layerToBackupManifestFileIterFactory, backupLocalityMap, introducedSpanFrontier, lowWaterMark, targetSize) + if err != nil { + return err + } + + for _, sp := range importSpans { + spanCh <- sp + } + return nil + } + + startEndKeyIt, err := newFileSpanStartAndEndKeyIterator(backups, layerToBackupManifestFileIterFactory) + if err != nil { + return err + } + + fileIterByLayer := make([]backupManifestFileIterator, 0, len(backups)) + for layer := range backups { + iter, err := layerToBackupManifestFileIterFactory[layer]() + if err != nil { + return err + } + + fileIterByLayer = append(fileIterByLayer, iter) + } + + // lastCovSpanSize is the size of files added to the right-most span of + // the cover so far. + var lastCovSpanSize int64 + var lastCovSpan roachpb.Span + var covFilesByLayer [][]backuppb.BackupManifest_File + var firstInSpan bool + + flush := func() { + entry := execinfrapb.RestoreSpanEntry{ + Span: lastCovSpan, + } + + for layer := range covFilesByLayer { + for _, f := range covFilesByLayer[layer] { + fileSpec := execinfrapb.RestoreFileSpec{Path: f.Path, Dir: backups[layer].Dir} + if dir, ok := backupLocalityMap[layer][f.LocalityKV]; ok { + fileSpec = execinfrapb.RestoreFileSpec{Path: f.Path, Dir: dir} + } + entry.Files = append(entry.Files, fileSpec) + } + } + + if len(entry.Files) > 0 { + spanCh <- entry + } + } + + for _, span := range requiredSpans { + firstInSpan = true + if span.EndKey.Compare(lowWaterMark) < 0 { + continue + } + if span.Key.Compare(lowWaterMark) < 0 { + span.Key = lowWaterMark + } + + layersCoveredLater := make(map[int]bool) + for layer := range backups { + var coveredLater bool + introducedSpanFrontier.SpanEntries(span, func(s roachpb.Span, + ts hlc.Timestamp) (done spanUtils.OpResult) { + if backups[layer].EndTime.Less(ts) { + coveredLater = true + } + return spanUtils.StopMatch + }) + if coveredLater { + // Don't use this backup to cover this span if the span was reintroduced + // after the backup's endTime. In this case, this backup may have + // invalid data, and further, a subsequent backup will contain all of + // this span's data. Consider the following example: + // + // T0: Begin IMPORT INTO on existing table foo, ingest some data + // T1: Backup foo + // T2: Rollback IMPORT via clearRange + // T3: Incremental backup of foo, with a full reintroduction of foo’s span + // T4: RESTORE foo: should only restore foo from the incremental backup. + // If data from the full backup were also restored, + // the imported-but-then-clearRanged data will leak in the restored cluster. + // This logic seeks to avoid this form of data corruption. + layersCoveredLater[layer] = true + } + } + + for { + if ok, err := startEndKeyIt.valid(); !ok { + if err != nil { + return err + } + break + } + + key := startEndKeyIt.value() + if span.Key.Compare(key) >= 0 { + startEndKeyIt.next() + continue + } + + var coverSpan roachpb.Span + if firstInSpan { + coverSpan.Key = span.Key + } else { + coverSpan.Key = lastCovSpan.EndKey + } + + if span.ContainsKey(key) { + coverSpan.EndKey = startEndKeyIt.value() + } else { + coverSpan.EndKey = span.EndKey + } + + newFilesByLayer, err := getNewIntersectingFilesByLayer(coverSpan, layersCoveredLater, fileIterByLayer) + if err != nil { + return err + } + + var filesByLayer [][]backuppb.BackupManifest_File + var covSize int64 + var newCovFilesSize int64 + + for layer := range newFilesByLayer { + for _, file := range newFilesByLayer[layer] { + sz := file.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } + newCovFilesSize += sz + } + filesByLayer = append(filesByLayer, newFilesByLayer[layer]) + } + + for layer := range covFilesByLayer { + for _, file := range covFilesByLayer[layer] { + sz := file.EntryCounts.DataSize + if sz == 0 { + sz = 16 << 20 + } + + if coverSpan.Overlaps(file.Span) { + covSize += sz + filesByLayer[layer] = append(filesByLayer[layer], file) + } + } + } + + if covFilesByLayer == nil { + covFilesByLayer = newFilesByLayer + lastCovSpan = coverSpan + lastCovSpanSize = newCovFilesSize + } else { + if (newCovFilesSize == 0 || lastCovSpanSize+newCovFilesSize <= targetSize) && !firstInSpan { + // If there are no new files that cover this span or if we can add the + // files in the new span's cover to the last span's cover and still stay + // below targetSize, then we should merge the two spans. + for layer := range newFilesByLayer { + covFilesByLayer[layer] = append(covFilesByLayer[layer], newFilesByLayer[layer]...) + } + lastCovSpan.EndKey = coverSpan.EndKey + lastCovSpanSize = lastCovSpanSize + newCovFilesSize + } else { + flush() + lastCovSpan = coverSpan + covFilesByLayer = filesByLayer + lastCovSpanSize = covSize + } + } + firstInSpan = false + + if lastCovSpan.EndKey.Equal(span.EndKey) { + break + } + + startEndKeyIt.next() + } + } + + flush() + return nil +} + +// fileSpanStartAndEndKeyIterator yields (almost) all of the start and end keys +// of the spans from the files in a backup chain in key order. A start or end +// key from a file span will be yielded by the iterator if the key is not +// covered by another file span within the same layer before it in FileCmp +// order. In particular, this means that if all layers in a backup chain have +// files with non-overlapping spans, then this iterator would return all start +// and end keys for all file spans in order. For example: +// +// backup +// 0| a___1___c c__2__e h__3__i +// 1| b___4___d g____5___i +// 2| a___________6______________h j_7_k +// 3| h_8_i l_9_m +// keys--a---b---c---d---e---f---g---h----i---j---k---l----m------p----> +// +// In this case, since no file span overlaps with another file span within the same layer, +// the iterator will yield all start and end keys: +// [a, b, c, d, e, g, h, i, j, k, l, m] +// +// Another example, but with file spans that do overlap within a layer: +// +// backup +// 0| a___1___c +// 1| b_____2_____e +// | d___3___f +// 2| a___________4___________g +// 3| +// keys--a---b---c---d---e---f---g---> +// +// In this case, there is overlap between files 2 and 3 within layer 1. Since +// the start and end keys 'b' and 'e' of file 2 will be yielded by the iterator +// since there are no files before it within the same layer. Start key 'd' of +// file 3 will not be yielded since it's covered by 2's span. The end key 'f' +// will still be yielded since it's not covered by 2's span. So the iterator +// will yield: +// [a, b, c, e, f, g] +type fileSpanStartAndEndKeyIterator struct { + heap *fileHeap + allIters []backupManifestFileIterator + err error +} + +func newFileSpanStartAndEndKeyIterator( + backups []backuppb.BackupManifest, layerToIterFactory layerToBackupManifestFileIterFactory, +) (*fileSpanStartAndEndKeyIterator, error) { + it := &fileSpanStartAndEndKeyIterator{} + for layer := range backups { + iter, err := layerToIterFactory[layer]() + if err != nil { + return nil, err + } + + it.allIters = append(it.allIters, iter) + } + it.reset() + return it, nil +} + +func (i *fileSpanStartAndEndKeyIterator) next() { + if ok, _ := i.valid(); !ok { + return + } + + prevKey := i.value() + for i.heap.Len() > 0 { + minItem := heap.Pop(i.heap).(fileHeapItem) + + curKey := minItem.key() + if curKey.Compare(prevKey) > 0 { + heap.Push(i.heap, minItem) + break + } + + if minItem.cmpEndKey { + file, ok := minItem.fileIter.next() + if err := minItem.fileIter.err(); err != nil { + i.err = err + return + } + if ok { + minItem.cmpEndKey = false + minItem.file = file + heap.Push(i.heap, minItem) + } + } else { + minItem.cmpEndKey = true + heap.Push(i.heap, minItem) + } + } +} + +func (i *fileSpanStartAndEndKeyIterator) valid() (bool, error) { + if i.err != nil { + return false, i.err + } + return i.heap.Len() > 0, nil +} + +func (i *fileSpanStartAndEndKeyIterator) value() roachpb.Key { + if ok, _ := i.valid(); !ok { + return nil + } + + return i.heap.fileHeapItems[0].key() +} +func (i *fileSpanStartAndEndKeyIterator) reset() { + i.heap = &fileHeap{} + i.err = nil + + for _, iter := range i.allIters { + iter.reset() + + file, ok := iter.next() + if err := iter.err(); err != nil { + i.err = err + return + } + if ok { + i.heap.fileHeapItems = append(i.heap.fileHeapItems, fileHeapItem{ + fileIter: iter, + file: file, + cmpEndKey: false, + }) + } + } + heap.Init(i.heap) +} + +type fileHeapItem struct { + fileIter backupManifestFileIterator + file backuppb.BackupManifest_File + cmpEndKey bool +} + +func (f fileHeapItem) key() roachpb.Key { + if f.cmpEndKey { + return f.file.Span.EndKey + } + return f.file.Span.Key +} + +type fileHeap struct { + fileHeapItems []fileHeapItem +} + +func (f *fileHeap) Len() int { + return len(f.fileHeapItems) +} + +func (f *fileHeap) Less(i, j int) bool { + return f.fileHeapItems[i].key().Compare(f.fileHeapItems[j].key()) < 0 +} + +func (f *fileHeap) Swap(i, j int) { + f.fileHeapItems[i], f.fileHeapItems[j] = f.fileHeapItems[j], f.fileHeapItems[i] +} + +func (f *fileHeap) Push(x any) { + item, ok := x.(fileHeapItem) + if !ok { + panic("pushed value not fileHeapItem") + } + + f.fileHeapItems = append(f.fileHeapItems, item) +} + +func (f *fileHeap) Pop() any { + old := f.fileHeapItems + n := len(old) + item := old[n-1] + f.fileHeapItems = old[0 : n-1] + return item +} + +func getNewIntersectingFilesByLayer( + span roachpb.Span, layersCoveredLater map[int]bool, fileIters []backupManifestFileIterator, +) ([][]backuppb.BackupManifest_File, error) { + var files [][]backuppb.BackupManifest_File + + for l, iter := range fileIters { + var layerFiles []backuppb.BackupManifest_File + if !layersCoveredLater[l] { + for ; ; iter.next() { + f, ok := iter.peek() + if !ok { + break + } + + if span.Overlaps(f.Span) { + layerFiles = append(layerFiles, f) + } + + if span.EndKey.Compare(f.Span.Key) <= 0 { + break + } + } + if iter.err() != nil { + return nil, iter.err() + } + } + files = append(files, layerFiles) + } + + return files, nil +} diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index afa06a8474b5..36179e309fda 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -138,13 +139,12 @@ func MockBackupChain( // iterating through the partitions of the cover and removing that partition's // span from the group for every file specified by that partition, and then // checking that all the groups are empty, indicating no needed span was missed. -// It also checks that each file that the cover has an expected number of -// partitions (i.e. isn't just one big partition of all files), by comparing its -// length to the number of files a file's end key was greater than any prior end -// key when walking files in order by start key in the backups. This check is -// thus sensitive to ordering; the coverage correctness check however is not. // // The function also verifies that a cover does not cross a span boundary. +// +// TODO(rui): this check previously contained a partition count check. +// Partitions are now generated differently, so this is a reminder to add this +// check back in when I figure out what the expected partition count should be. func checkRestoreCovering( ctx context.Context, backups []backuppb.BackupManifest, @@ -153,7 +153,6 @@ func checkRestoreCovering( merged bool, storageFactory cloud.ExternalStorageFactory, ) error { - var expectedPartitions int required := make(map[string]*roachpb.SpanGroup) introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) @@ -194,7 +193,6 @@ func checkRestoreCovering( required[f.Path].Add(sp) if sp.EndKey.Compare(last) > 0 { last = sp.EndKey - expectedPartitions++ } } } @@ -224,18 +222,50 @@ func checkRestoreCovering( return errors.Errorf("file %s was supposed to cover span %s", name, missing) } } - if got := len(cov); got != expectedPartitions && !merged { - return errors.Errorf("expected %d partitions, got %d", expectedPartitions, got) - } return nil } const noSpanTargetSize = 0 +func makeImportSpans( + ctx context.Context, + spans []roachpb.Span, + backups []backuppb.BackupManifest, + layerToBackupManifestFileIterFactory layerToBackupManifestFileIterFactory, + targetSize int64, + introducedSpanFrontier *spanUtils.Frontier, + useSimpleImportSpans bool, +) ([]execinfrapb.RestoreSpanEntry, error) { + var cover []execinfrapb.RestoreSpanEntry + spanCh := make(chan execinfrapb.RestoreSpanEntry) + g := ctxgroup.WithContext(context.Background()) + g.Go(func() error { + for entry := range spanCh { + cover = append(cover, entry) + } + return nil + }) + + err := generateAndSendImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, targetSize, spanCh, useSimpleImportSpans) + close(spanCh) + + if err != nil { + return nil, err + } + if err := g.Wait(); err != nil { + return nil, err + } + return cover, nil +} + func TestRestoreEntryCoverExample(t *testing.T) { defer leaktest.AfterTest(t)() + + const numAccounts = 1 ctx := context.Background() - tc, _, _, cleanupFn := backupRestoreTestSetup(t, singleNode, 1, InitManualReplication) + + tc, _, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, + InitManualReplication) defer cleanupFn() sp := func(start, end string) roachpb.Span { @@ -278,23 +308,26 @@ func TestRestoreEntryCoverExample(t *testing.T) { layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, &execCfg, backups, nil, nil) require.NoError(t, err) - cover, err := makeSimpleImportSpans(spans, backups, layerToBackupManifestFileIterFactory, nil, - emptySpanFrontier, nil, noSpanTargetSize) + cover, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, noSpanTargetSize, emptySpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "c"), Files: paths("1", "4", "6")}, - {Span: sp("c", "e"), Files: paths("2", "4", "6")}, - {Span: sp("e", "f"), Files: paths("6")}, - {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, + {Span: sp("a", "b"), Files: paths("1", "6")}, + {Span: sp("b", "c"), Files: paths("1", "4", "6")}, + {Span: sp("c", "f"), Files: paths("2", "4", "6")}, + {Span: sp("f", "g"), Files: paths("6")}, + {Span: sp("g", "h"), Files: paths("5", "6")}, + {Span: sp("h", "i"), Files: paths("3", "5", "8")}, {Span: sp("l", "m"), Files: paths("9")}, }, cover) - coverSized, err := makeSimpleImportSpans(spans, backups, layerToBackupManifestFileIterFactory, - nil, emptySpanFrontier, nil, 2<<20) + coverSized, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, 2<<20, emptySpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ - {Span: sp("a", "f"), Files: paths("1", "2", "4", "6")}, - {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, + {Span: sp("a", "b"), Files: paths("1", "6")}, + {Span: sp("b", "c"), Files: paths("1", "4", "6")}, + {Span: sp("c", "f"), Files: paths("2", "4", "6")}, + {Span: sp("f", "h"), Files: paths("5", "6")}, + {Span: sp("h", "i"), Files: paths("3", "5", "8")}, {Span: sp("l", "m"), Files: paths("9")}, }, coverSized) @@ -303,12 +336,13 @@ func TestRestoreEntryCoverExample(t *testing.T) { introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) require.NoError(t, err) - coverIntroduced, err := makeSimpleImportSpans(spans, backups, layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, - noSpanTargetSize) + coverIntroduced, err := makeImportSpans(ctx, spans, backups, layerToBackupManifestFileIterFactory, noSpanTargetSize, introducedSpanFrontier, false) require.NoError(t, err) require.Equal(t, []execinfrapb.RestoreSpanEntry{ {Span: sp("a", "f"), Files: paths("6")}, - {Span: sp("f", "i"), Files: paths("3", "5", "6", "8")}, + {Span: sp("f", "g"), Files: paths("6")}, + {Span: sp("g", "h"), Files: paths("5", "6")}, + {Span: sp("h", "i"), Files: paths("3", "5", "8")}, {Span: sp("l", "m"), Files: paths("9")}, }, coverIntroduced) @@ -515,8 +549,8 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, &execCfg, backups, nil, nil) require.NoError(t, err) - cover, err := makeSimpleImportSpans(restoreSpans, backups, layerToBackupManifestFileIterFactory, - nil, introducedSpanFrontier, nil, 0) + cover, err := makeImportSpans(ctx, restoreSpans, backups, layerToBackupManifestFileIterFactory, + 0, introducedSpanFrontier, false) require.NoError(t, err) for _, reIntroTable := range reIntroducedTables { @@ -552,25 +586,26 @@ func TestRestoreEntryCover(t *testing.T) { for _, spans := range []int{1, 2, 3, 5, 9, 11, 12} { for _, files := range []int{0, 1, 2, 3, 4, 10, 12, 50} { for _, hasExternalFilesList := range []bool{true, false} { - backups, err := MockBackupChain(ctx, numBackups, spans, files, r, hasExternalFilesList, execCfg) - require.NoError(t, err) - - for _, target := range []int64{0, 1, 4, 100, 1000} { - t.Run(fmt.Sprintf("numBackups=%d, numSpans=%d, numFiles=%d, merge=%d, slim=%t", - numBackups, spans, files, target, hasExternalFilesList), func(t *testing.T) { - introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) - require.NoError(t, err) - - layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, - &execCfg, backups, nil, nil) - require.NoError(t, err) - cover, err := makeSimpleImportSpans(backups[numBackups-1].Spans, backups, - layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, - nil, target<<20) - require.NoError(t, err) - require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, - cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) - }) + for _, simpleImportSpans := range []bool{true, false} { + backups, err := MockBackupChain(ctx, numBackups, spans, files, r, hasExternalFilesList, execCfg) + require.NoError(t, err) + + for _, target := range []int64{0, 1, 4, 100, 1000} { + t.Run(fmt.Sprintf("numBackups=%d, numSpans=%d, numFiles=%d, merge=%d, slim=%t, simple=%t", + numBackups, spans, files, target, hasExternalFilesList, simpleImportSpans), func(t *testing.T) { + introducedSpanFrontier, err := createIntroducedSpanFrontier(backups, hlc.Timestamp{}) + require.NoError(t, err) + + layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, + &execCfg, backups, nil, nil) + require.NoError(t, err) + cover, err := makeImportSpans(ctx, backups[numBackups-1].Spans, backups, + layerToBackupManifestFileIterFactory, target<<20, introducedSpanFrontier, simpleImportSpans) + require.NoError(t, err) + require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, + cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) + }) + } } } } diff --git a/pkg/sql/execinfrapb/api.go b/pkg/sql/execinfrapb/api.go index f87b44fff2ae..ecd89228bf4f 100644 --- a/pkg/sql/execinfrapb/api.go +++ b/pkg/sql/execinfrapb/api.go @@ -86,3 +86,7 @@ func (m *ChangeAggregatorSpec) User() username.SQLUsername { func (m *ChangeFrontierSpec) User() username.SQLUsername { return m.UserProto.Decode() } + +func (m *GenerativeSplitAndScatterSpec) User() username.SQLUsername { + return m.UserProto.Decode() +} diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 5ef0914881e4..f0ab562aec4d 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -609,6 +609,12 @@ func (s *HashGroupJoinerSpec) summary() (string, []string) { return "HashGroupJoiner", details } +// summary implements the diagramCellType interface. +func (g *GenerativeSplitAndScatterSpec) summary() (string, []string) { + detail := fmt.Sprintf("%d import spans", g.NumEntries) + return "GenerativeSplitAndScatterSpec", []string{detail} +} + type diagramCell struct { Title string `json:"title"` Details []string `json:"details"` diff --git a/pkg/sql/execinfrapb/processors.proto b/pkg/sql/execinfrapb/processors.proto index 8b02a86a5bb5..cb5a518701e7 100644 --- a/pkg/sql/execinfrapb/processors.proto +++ b/pkg/sql/execinfrapb/processors.proto @@ -123,6 +123,7 @@ message ProcessorCoreUnion { optional IndexBackfillMergerSpec indexBackfillMerger = 38; optional TTLSpec ttl = 39; optional HashGroupJoinerSpec hashGroupJoiner = 40; + optional GenerativeSplitAndScatterSpec generativeSplitAndScatter = 41; reserved 6, 12, 14, 17, 18, 19, 20; } diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index b42d523c6dc4..0eb6c97c0fd4 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -391,3 +391,33 @@ message IndexBackfillMergerSpec { // NEXT ID: 9. } + +message GenerativeSplitAndScatterSpec { + repeated TableRekey table_rekeys = 1 [(gogoproto.nullable) = false]; + repeated TenantRekey tenant_rekeys = 2 [(gogoproto.nullable) = false]; + optional bool validate_only = 3 [(gogoproto.nullable) = false]; + + // URIs is the URIs of the backup manifests. + repeated string uris = 4 [(gogoproto.customname) = "URIs"]; + optional jobs.jobspb.BackupEncryptionOptions encryption = 5; + + // EndTime is the time of the restore. + optional util.hlc.Timestamp endTime = 9 [(gogoproto.nullable) = false]; + // Spans is the required spans in the restore. + repeated roachpb.Span spans = 10 [(gogoproto.nullable) = false]; + repeated jobs.jobspb.RestoreDetails.BackupLocalityInfo backup_locality_info = 11 [(gogoproto.nullable) = false]; + // HighWater is the high watermark of the previous run of restore. + optional bytes high_water = 12 [(gogoproto.nullable) = false]; + // User who initiated the restore. + optional string user_proto = 13 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; + // ChunkSize is the number of import spans per chunk. + optional int64 chunk_size = 14[(gogoproto.nullable) = false]; + // TargetSize is the target size for each import span. + optional int64 target_size = 15[(gogoproto.nullable) = false]; + // NumEntries is the total number of import spans in this restore. + optional int64 num_entries = 16[(gogoproto.nullable) = false]; + // NumNodes is the number of nodes available for dist restore. + optional int64 num_nodes = 17[(gogoproto.nullable) = false]; + optional int64 job_id = 18 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"]; + optional bool use_simple_import_spans = 19 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 16f97ca0ed13..1ccf097d7e2f 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -376,6 +376,15 @@ func NewProcessor( } return newHashGroupJoiner(ctx, flowCtx, processorID, core.HashGroupJoiner, inputs[0], inputs[1], post, outputs[0]) } + if core.GenerativeSplitAndScatter != nil { + if err := checkNumInOut(inputs, outputs, 0, 1); err != nil { + return nil, err + } + if NewGenerativeSplitAndScatterProcessor == nil { + return nil, errors.New("GenerativeSplitAndScatter processor unimplemented") + } + return NewGenerativeSplitAndScatterProcessor(ctx, flowCtx, processorID, *core.GenerativeSplitAndScatter, post, outputs[0]) + } return nil, errors.Errorf("unsupported processor core %q", core) } @@ -411,3 +420,6 @@ var NewStreamIngestionFrontierProcessor func(context.Context, *execinfra.FlowCtx // NewTTLProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewTTLProcessor func(context.Context, *execinfra.FlowCtx, int32, execinfrapb.TTLSpec, execinfra.RowReceiver) (execinfra.Processor, error) + +// NewGenerativeSplitAndScatterProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. +var NewGenerativeSplitAndScatterProcessor func(context.Context, *execinfra.FlowCtx, int32, execinfrapb.GenerativeSplitAndScatterSpec, *execinfrapb.PostProcessSpec, execinfra.RowReceiver) (execinfra.Processor, error)