From a743d82e34f1fb08f6886d8080d4ad413baee26f Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Wed, 8 Jun 2022 14:34:27 -0400 Subject: [PATCH] backupinfo: introduce a backupinfo package The backupinfo package contains logic related to interacting with information and metadata describing the backup. After this change we have `backupdest` depending on `backupinfo`. Release note: None --- pkg/BUILD.bazel | 2 + pkg/ccl/backupccl/BUILD.bazel | 7 +- pkg/ccl/backupccl/backup_job.go | 43 +- pkg/ccl/backupccl/backup_planning.go | 135 +---- pkg/ccl/backupccl/backup_test.go | 48 +- pkg/ccl/backupccl/backupbase/constants.go | 7 +- pkg/ccl/backupccl/backupdest/BUILD.bazel | 3 + .../backupdest/backup_destination.go | 210 ++++++- .../backupccl/backupdest/incrementals_test.go | 2 +- .../backupccl/backupencryption/encryption.go | 7 + pkg/ccl/backupccl/backupinfo/BUILD.bazel | 87 +++ .../{ => backupinfo}/backup_metadata.go | 21 +- .../{ => backupinfo}/backup_metadata_test.go | 35 +- pkg/ccl/backupccl/backupinfo/main_test.go | 33 ++ .../{ => backupinfo}/manifest_handling.go | 511 ++++++++---------- pkg/ccl/backupccl/backuputils/testutils.go | 10 + pkg/ccl/backupccl/restore_job.go | 39 +- pkg/ccl/backupccl/restore_planning.go | 7 +- pkg/ccl/backupccl/restore_span_covering.go | 3 +- pkg/ccl/backupccl/show.go | 37 +- pkg/ccl/backupccl/targets.go | 7 +- pkg/ccl/backupccl/utils_test.go | 3 +- pkg/ccl/cliccl/BUILD.bazel | 1 + pkg/ccl/cliccl/debug_backup.go | 9 +- 24 files changed, 723 insertions(+), 544 deletions(-) create mode 100644 pkg/ccl/backupccl/backupinfo/BUILD.bazel rename pkg/ccl/backupccl/{ => backupinfo}/backup_metadata.go (98%) rename pkg/ccl/backupccl/{ => backupinfo}/backup_metadata_test.go (87%) create mode 100644 pkg/ccl/backupccl/backupinfo/main_test.go rename pkg/ccl/backupccl/{ => backupinfo}/manifest_handling.go (75%) diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index bcb4007fd292..76e79c4215e4 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -10,6 +10,8 @@ ALL_TESTS = [ "//pkg/build/starlarkutil:starlarkutil_test", "//pkg/build/util:util_test", "//pkg/ccl/backupccl/backupdest:backupdest_test", + "//pkg/ccl/backupccl/backupinfo:backupinfo_disallowed_imports_test", + "//pkg/ccl/backupccl/backupinfo:backupinfo_test", "//pkg/ccl/backupccl/backupresolver:backupresolver_test", "//pkg/ccl/backupccl:backupccl_test", "//pkg/ccl/baseccl:baseccl_test", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 045e3dc38358..e3ca409b3d20 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -5,7 +5,6 @@ go_library( srcs = [ "alter_backup_planning.go", "backup_job.go", - "backup_metadata.go", "backup_planning.go", "backup_planning_tenant.go", "backup_processor.go", @@ -14,7 +13,6 @@ go_library( "create_scheduled_backup.go", "file_sst_sink.go", "key_rewriter.go", - "manifest_handling.go", "restoration_data.go", "restore_data_processor.go", "restore_job.go", @@ -37,6 +35,7 @@ go_library( "//pkg/ccl/backupccl/backupbase", "//pkg/ccl/backupccl/backupdest", "//pkg/ccl/backupccl/backupencryption", + "//pkg/ccl/backupccl/backupinfo", "//pkg/ccl/backupccl/backuppb", "//pkg/ccl/backupccl/backupresolver", "//pkg/ccl/backupccl/backuputils", @@ -112,10 +111,8 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/contextutil", "//pkg/util/ctxgroup", - "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/interval", - "//pkg/util/ioctx", "//pkg/util/json", "//pkg/util/log", "//pkg/util/log/eventpb", @@ -144,7 +141,6 @@ go_test( "alter_backup_test.go", "backup_cloud_test.go", "backup_intents_test.go", - "backup_metadata_test.go", "backup_planning_test.go", "backup_rand_test.go", "backup_tenant_test.go", @@ -178,6 +174,7 @@ go_test( "//pkg/ccl/backupccl/backupbase", "//pkg/ccl/backupccl/backupdest", "//pkg/ccl/backupccl/backupencryption", + "//pkg/ccl/backupccl/backupinfo", "//pkg/ccl/backupccl/backuppb", "//pkg/ccl/kvccl", "//pkg/ccl/multiregionccl", diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 103cf6a1457a..c343ef3797fe 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupdest" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" @@ -243,7 +244,7 @@ func backup( RevisionStartTime: backupManifest.RevisionStartTime, }) - err := writeBackupManifestCheckpoint( + err := backupinfo.WriteBackupManifestCheckpoint( ctx, defaultURI, encryption, backupManifest, execCtx.ExecCfg(), execCtx.User(), ) if err != nil { @@ -290,8 +291,8 @@ func backup( // Set a unique filename for each partition backup descriptor. The ID // ensures uniqueness, and the kv string appended to the end is for // readability. - filename := fmt.Sprintf("%s_%d_%s", - backupPartitionDescriptorPrefix, nextPartitionedDescFilenameID, sanitizeLocalityKV(kv)) + filename := fmt.Sprintf("%s_%d_%s", backupPartitionDescriptorPrefix, + nextPartitionedDescFilenameID, backupinfo.SanitizeLocalityKV(kv)) nextPartitionedDescFilenameID++ backupManifest.PartitionDescriptorFilenames = append(backupManifest.PartitionDescriptorFilenames, filename) desc := backuppb.BackupPartitionDescriptor{ @@ -306,7 +307,7 @@ func backup( return err } defer store.Close() - return writeBackupPartitionDescriptor(ctx, store, filename, encryption, &desc) + return backupinfo.WriteBackupPartitionDescriptor(ctx, store, filename, encryption, &desc) }(); err != nil { return roachpb.RowCount{}, err } @@ -314,7 +315,8 @@ func backup( } resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup manifest"}) - if err := writeBackupManifest(ctx, settings, defaultStore, backupbase.BackupManifestName, encryption, backupManifest); err != nil { + if err := backupinfo.WriteBackupManifest(ctx, settings, defaultStore, backupbase.BackupManifestName, + encryption, backupManifest); err != nil { return roachpb.RowCount{}, err } var tableStatistics []*stats.TableStatisticProto @@ -344,12 +346,12 @@ func backup( } resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup table statistics"}) - if err := writeTableStatistics(ctx, defaultStore, backupStatisticsFileName, encryption, &statsTable); err != nil { + if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &statsTable); err != nil { return roachpb.RowCount{}, err } - if writeMetadataSST.Get(&settings.SV) { - if err := writeBackupMetadataSST(ctx, defaultStore, encryption, backupManifest, tableStatistics); err != nil { + if backupinfo.WriteMetadataSST.Get(&settings.SV) { + if err := backupinfo.WriteBackupMetadataSST(ctx, defaultStore, encryption, backupManifest, tableStatistics); err != nil { err = errors.Wrap(err, "writing forward-compat metadata sst") if !build.IsRelease() { return roachpb.RowCount{}, err @@ -448,7 +450,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { } } - if err := writeBackupManifestCheckpoint( + if err := backupinfo.WriteBackupManifestCheckpoint( ctx, details.URI, details.EncryptionOptions, backupManifest, p.ExecCfg(), p.User(), ); err != nil { return err @@ -730,20 +732,21 @@ func (b *backupResumer) readManifestOnResume( // they could be using either the new or the old foreign key // representations. We should just preserve whatever representation the // table descriptors were using and leave them alone. - desc, memSize, err := readBackupCheckpointManifest(ctx, mem, defaultStore, backupManifestCheckpointName, - details.EncryptionOptions) + desc, memSize, err := backupinfo.ReadBackupCheckpointManifest(ctx, mem, defaultStore, + backupinfo.BackupManifestCheckpointName, details.EncryptionOptions) if err != nil { if !errors.Is(err, cloud.ErrFileDoesNotExist) { return nil, 0, errors.Wrapf(err, "reading backup checkpoint") } // Try reading temp checkpoint. - tmpCheckpoint := tempCheckpointFileNameForJob(b.job.ID()) - desc, memSize, err = readBackupCheckpointManifest(ctx, mem, defaultStore, tmpCheckpoint, details.EncryptionOptions) + tmpCheckpoint := backupinfo.TempCheckpointFileNameForJob(b.job.ID()) + desc, memSize, err = backupinfo.ReadBackupCheckpointManifest(ctx, mem, defaultStore, + tmpCheckpoint, details.EncryptionOptions) if err != nil { return nil, 0, err } // "Rename" temp checkpoint. - if err := writeBackupManifestCheckpoint( + if err := backupinfo.WriteBackupManifestCheckpoint( ctx, details.URI, details.EncryptionOptions, &desc, cfg, user, ); err != nil { mem.Shrink(ctx, memSize) @@ -753,8 +756,8 @@ func (b *backupResumer) readManifestOnResume( if err := defaultStore.Delete(ctx, tmpCheckpoint); err != nil { log.Errorf(ctx, "error removing temporary checkpoint %s", tmpCheckpoint) } - if err := defaultStore.Delete(ctx, backupProgressDirectory+"/"+tmpCheckpoint); err != nil { - log.Errorf(ctx, "error removing temporary checkpoint %s", backupProgressDirectory+"/"+tmpCheckpoint) + if err := defaultStore.Delete(ctx, backupinfo.BackupProgressDirectory+"/"+tmpCheckpoint); err != nil { + log.Errorf(ctx, "error removing temporary checkpoint %s", backupinfo.BackupProgressDirectory+"/"+tmpCheckpoint) } } @@ -850,18 +853,18 @@ func (b *backupResumer) deleteCheckpoint( defer exportStore.Close() // We first attempt to delete from base directory to account for older // backups, and then from the progress directory. - err = exportStore.Delete(ctx, backupManifestCheckpointName) + err = exportStore.Delete(ctx, backupinfo.BackupManifestCheckpointName) if err != nil { log.Warningf(ctx, "unable to delete checkpointed backup descriptor file in base directory: %+v", err) } - err = exportStore.Delete(ctx, backupManifestCheckpointName+backupManifestChecksumSuffix) + err = exportStore.Delete(ctx, backupinfo.BackupManifestCheckpointName+backupinfo.BackupManifestChecksumSuffix) if err != nil { log.Warningf(ctx, "unable to delete checkpoint checksum file in base directory: %+v", err) } // Delete will not delete a nonempty directory, so we have to go through // all files and delete each file one by one. - return exportStore.List(ctx, backupProgressDirectory, "", func(p string) error { - return exportStore.Delete(ctx, backupProgressDirectory+p) + return exportStore.List(ctx, backupinfo.BackupProgressDirectory, "", func(p string) error { + return exportStore.Delete(ctx, backupinfo.BackupProgressDirectory+p) }) }(); err != nil { log.Warningf(ctx, "unable to delete checkpointed backup descriptor file in progress directory: %+v", err) diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index ff6bc7947b21..2c8e668f6329 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -9,7 +9,6 @@ package backupccl import ( - "bytes" "context" "fmt" "reflect" @@ -22,12 +21,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupdest" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver" - "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/cloud" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/featureflag" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -55,7 +53,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" @@ -63,8 +60,6 @@ import ( const ( backupOptRevisionHistory = "revision_history" - backupOptEncPassphrase = "encryption_passphrase" - backupOptEncKMS = "kms" backupOptWithPrivileges = "privileges" backupOptAsJSON = "as_json" backupOptWithDebugIDs = "debug_ids" @@ -72,6 +67,9 @@ const ( backupOptDebugMetadataSST = "debug_dump_metadata_sst" backupOptEncDir = "encryption_info_dir" backupOptCheckFiles = "check_files" + // backupPartitionDescriptorPrefix is the file name prefix for serialized + // BackupPartitionDescriptor protos. + backupPartitionDescriptorPrefix = "BACKUP_PART" ) type tableAndIndex struct { @@ -786,120 +784,6 @@ func getScheduledBackupExecutionArgsFromSchedule( return sj, args, nil } -// writeBackupManifestCheckpoint writes a new BACKUP-CHECKPOINT MANIFEST and -// CHECKSUM file. If it is a pure v22.1 cluster or later, it will write a -// timestamped BACKUP-CHECKPOINT to the /progress directory. If it is a mixed -// cluster version, it will write a non timestamped BACKUP-CHECKPOINT to the -// base directory in order to not break backup jobs that resume on a v21.2 node. -func writeBackupManifestCheckpoint( - ctx context.Context, - storageURI string, - encryption *jobspb.BackupEncryptionOptions, - desc *backuppb.BackupManifest, - execCfg *sql.ExecutorConfig, - user username.SQLUsername, -) error { - var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "write-backup-manifest-checkpoint") - defer span.Finish() - - defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, storageURI, user) - if err != nil { - return err - } - defer defaultStore.Close() - - sort.Sort(BackupFileDescriptors(desc.Files)) - - descBuf, err := protoutil.Marshal(desc) - if err != nil { - return err - } - - descBuf, err = compressData(descBuf) - if err != nil { - return errors.Wrap(err, "compressing backup manifest") - } - - if encryption != nil { - encryptionKey, err := backupencryption.GetEncryptionKey(ctx, encryption, execCfg.Settings, defaultStore.ExternalIOConf()) - if err != nil { - return err - } - descBuf, err = storageccl.EncryptFile(descBuf, encryptionKey) - if err != nil { - return err - } - } - - // If the cluster is still running on a mixed version, we want to write - // to the base directory instead of the progress directory. That way if - // an old node resumes a backup, it doesn't have to start over. - if !execCfg.Settings.Version.IsActive(ctx, clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint) { - // We want to overwrite the latest checkpoint in the base directory, - // just write to the non versioned BACKUP-CHECKPOINT file. - err = cloud.WriteFile(ctx, defaultStore, backupManifestCheckpointName, bytes.NewReader(descBuf)) - if err != nil { - return err - } - - checksum, err := getChecksum(descBuf) - if err != nil { - return err - } - - return cloud.WriteFile(ctx, defaultStore, backupManifestCheckpointName+backupManifestChecksumSuffix, bytes.NewReader(checksum)) - } - - // We timestamp the checkpoint files in order to enforce write once backups. - // When the job goes to read these timestamped files, it will List - // the checkpoints and pick the file whose name is lexicographically - // sorted to the top. This will be the last checkpoint we write, for - // details refer to newTimestampedCheckpointFileName. - filename := newTimestampedCheckpointFileName() - - // HTTP storage does not support listing and so we cannot rely on the - // above-mentioned List method to return us the latest checkpoint file. - // Instead, we will write a checkpoint once with a well-known filename, - // and teach the job to always reach for that filename in the face of - // a resume. We may lose progress, but this is a cost we are willing - // to pay to uphold write-once semantics. - if defaultStore.Conf().Provider == roachpb.ExternalStorageProvider_http { - // TODO (darryl): We should do this only for file not found or directory - // does not exist errors. As of right now we only specifically wrap - // ReadFile errors for file not found so this is not possible yet. - if r, err := defaultStore.ReadFile(ctx, backupProgressDirectory+"/"+backupManifestCheckpointName); err != nil { - // Since we did not find the checkpoint file this is the first time - // we are going to write a checkpoint, so write it with the well - // known filename. - filename = backupManifestCheckpointName - } else { - err = r.Close(ctx) - if err != nil { - return err - } - } - } - - err = cloud.WriteFile(ctx, defaultStore, backupProgressDirectory+"/"+filename, bytes.NewReader(descBuf)) - if err != nil { - return errors.Wrap(err, "calculating checksum") - } - - // Write the checksum file after we've successfully wrote the checkpoint. - checksum, err := getChecksum(descBuf) - if err != nil { - return errors.Wrap(err, "calculating checksum") - } - - err = cloud.WriteFile(ctx, defaultStore, backupProgressDirectory+"/"+filename+backupManifestChecksumSuffix, bytes.NewReader(checksum)) - if err != nil { - return err - } - - return nil -} - // planSchedulePTSChaining populates backupDetails with information relevant to // the chaining of protected timestamp records between scheduled backups. // Depending on whether backupStmt is a full or incremental backup, we populate @@ -1210,7 +1094,7 @@ func getBackupDetailAndManifest( } defer defaultStore.Close() - if err := checkForPreviousBackup(ctx, defaultStore, defaultURI); err != nil { + if err := backupinfo.CheckForPreviousBackup(ctx, defaultStore, defaultURI); err != nil { return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err } @@ -1219,7 +1103,7 @@ func getBackupDetailAndManifest( mem := execCfg.RootMemoryMonitor.MakeBoundAccount() defer mem.Close(ctx) - prevBackups, encryptionOptions, memSize, err := fetchPreviousBackups(ctx, &mem, user, + prevBackups, encryptionOptions, memSize, err := backupinfo.FetchPreviousBackups(ctx, &mem, user, makeCloudStorage, prevs, *initialDetails.EncryptionOptions, kmsEnv) if err != nil { @@ -1357,6 +1241,9 @@ func getTenantInfo( return spans, tenants, nil } +// TODO(adityamaru): We need to move this method into manifest_handling.go but +// the method needs to be decomposed to decouple it from other planning related +// operations. func createBackupManifest( ctx context.Context, execCfg *sql.ExecutorConfig, @@ -1400,7 +1287,7 @@ func createBackupManifest( tables = append(tables, desc) // TODO (anzo): look into the tradeoffs of having all objects in the array to be in the same file, // vs having each object in a separate file, or somewhere in between. - statsFiles[desc.GetID()] = backupStatisticsFileName + statsFiles[desc.GetID()] = backupinfo.BackupStatisticsFileName } } @@ -1489,7 +1376,7 @@ func createBackupManifest( CompleteDbs: jobDetails.ResolvedCompleteDbs, Spans: spans, IntroducedSpans: newSpans, - FormatVersion: BackupFormatDescriptorTrackingVersion, + FormatVersion: backupinfo.BackupFormatDescriptorTrackingVersion, BuildInfo: build.GetInfo(), ClusterVersion: execCfg.Settings.Version.ActiveVersion(ctx).Version, ClusterID: execCfg.LogicalClusterID(), diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 661d8b65099d..a9488b952d0b 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupdest" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" @@ -140,7 +141,7 @@ func TestBackupRestoreStatementResult(t *testing.T) { if err != nil { t.Fatal(err) } - require.True(t, isGZipped(backupManifestBytes)) + require.True(t, backupinfo.IsGZipped(backupManifestBytes)) }) sqlDB.Exec(t, "CREATE DATABASE data2") @@ -297,7 +298,7 @@ func TestBackupRestorePartitioned(t *testing.T) { if err != nil { t.Fatal(err) } - require.True(t, isGZipped(backupPartitionBytes)) + require.True(t, backupinfo.IsGZipped(backupPartitionBytes)) } } } @@ -1535,7 +1536,7 @@ func TestBackupRestoreResume(t *testing.T) { testName string checkpointDirectory string }{ - {testName: "backup-progress-directory", checkpointDirectory: "/" + backupProgressDirectory}, + {testName: "backup-progress-directory", checkpointDirectory: "/" + backupinfo.BackupProgressDirectory}, {testName: "backup-base-directory", checkpointDirectory: ""}, } { item := item @@ -1561,7 +1562,7 @@ func TestBackupRestoreResume(t *testing.T) { if err := os.MkdirAll(backupDir+item.checkpointDirectory, 0755); err != nil { t.Fatal(err) } - checkpointFile := backupDir + item.checkpointDirectory + "/" + backupManifestCheckpointName + checkpointFile := backupDir + item.checkpointDirectory + "/" + backupinfo.BackupManifestCheckpointName if err := ioutil.WriteFile(checkpointFile, mockManifest, 0644); err != nil { t.Fatal(err) } @@ -1581,8 +1582,8 @@ func TestBackupRestoreResume(t *testing.T) { if err != nil { t.Fatal(err) } - if isGZipped(backupManifestBytes) { - backupManifestBytes, err = decompressData(ctx, nil, backupManifestBytes) + if backupinfo.IsGZipped(backupManifestBytes) { + backupManifestBytes, err = backupinfo.DecompressData(ctx, nil, backupManifestBytes) require.NoError(t, err) } var backupManifest backuppb.BackupManifest @@ -3918,8 +3919,8 @@ func TestBackupRestoreChecksum(t *testing.T) { if err != nil { t.Fatalf("%+v", err) } - if isGZipped(backupManifestBytes) { - backupManifestBytes, err = decompressData(context.Background(), nil, backupManifestBytes) + if backupinfo.IsGZipped(backupManifestBytes) { + backupManifestBytes, err = backupinfo.DecompressData(context.Background(), nil, backupManifestBytes) require.NoError(t, err) } if err := protoutil.Unmarshal(backupManifestBytes, &backupManifest); err != nil { @@ -5774,8 +5775,7 @@ func TestBackupRestoreCorruptedStatsIgnored(t *testing.T) { statsTable := backuppb.StatsTable{ Statistics: []*stats.TableStatisticProto{{TableID: descpb.ID(tableID + 1), Name: "notbank"}}, } - require.NoError(t, writeTableStatistics(ctx, store, backupStatisticsFileName, - nil /* encryption */, &statsTable)) + require.NoError(t, backupinfo.WriteTableStatistics(ctx, store, nil /* encryption */, &statsTable)) sqlDB.Exec(t, `CREATE DATABASE "data 2"`) sqlDB.Exec(t, fmt.Sprintf(`RESTORE data.bank FROM "%s" WITH skip_missing_foreign_keys, into_db = "%s"`, @@ -5783,7 +5783,7 @@ func TestBackupRestoreCorruptedStatsIgnored(t *testing.T) { // Delete the stats file to ensure a restore can succeed even if statistics do // not exist. - require.NoError(t, store.Delete(ctx, backupStatisticsFileName)) + require.NoError(t, store.Delete(ctx, backupinfo.BackupStatisticsFileName)) sqlDB.Exec(t, `CREATE DATABASE "data 3"`) sqlDB.Exec(t, fmt.Sprintf(`RESTORE data.bank FROM "%s" WITH skip_missing_foreign_keys, into_db = "%s"`, dest, "data 3")) @@ -8018,8 +8018,8 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) { for i := 0; i < magic; i++ { desc.Files = append(desc.Files, backuppb.BackupManifest_File{Path: fmt.Sprintf("%d-file-%d", i, i)}) } - require.NoError(t, writeBackupManifest(ctx, st, storage, "testmanifest", encOpts, desc)) - _, sz, err := readBackupManifest(ctx, &mem, storage, "testmanifest", encOpts) + require.NoError(t, backupinfo.WriteBackupManifest(ctx, st, storage, "testmanifest", encOpts, desc)) + _, sz, err := backupinfo.ReadBackupManifest(ctx, &mem, storage, "testmanifest", encOpts) require.NoError(t, err) mem.Shrink(ctx, sz) mem.Close(ctx) @@ -8041,7 +8041,7 @@ func TestManifestTooNew(t *testing.T) { manifestPath := filepath.Join(rawDir, "too_new", backupbase.BackupManifestName) manifestData, err := ioutil.ReadFile(manifestPath) require.NoError(t, err) - manifestData, err = decompressData(context.Background(), nil, manifestData) + manifestData, err = backupinfo.DecompressData(context.Background(), nil, manifestData) require.NoError(t, err) var backupManifest backuppb.BackupManifest require.NoError(t, protoutil.Unmarshal(manifestData, &backupManifest)) @@ -8052,9 +8052,9 @@ func TestManifestTooNew(t *testing.T) { require.NoError(t, err) require.NoError(t, ioutil.WriteFile(manifestPath, manifestData, 0644 /* perm */)) // Also write the checksum file to match the new manifest. - checksum, err := getChecksum(manifestData) + checksum, err := backupinfo.GetChecksum(manifestData) require.NoError(t, err) - require.NoError(t, ioutil.WriteFile(manifestPath+backupManifestChecksumSuffix, checksum, 0644 /* perm */)) + require.NoError(t, ioutil.WriteFile(manifestPath+backupinfo.BackupManifestChecksumSuffix, checksum, 0644 /* perm */)) // Verify we reject it. sqlDB.ExpectErr(t, "backup from version 99.1 is newer than current version", `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`) @@ -8065,9 +8065,9 @@ func TestManifestTooNew(t *testing.T) { require.NoError(t, err) require.NoError(t, ioutil.WriteFile(manifestPath, manifestData, 0644 /* perm */)) // Also write the checksum file to match the new manifest. - checksum, err = getChecksum(manifestData) + checksum, err = backupinfo.GetChecksum(manifestData) require.NoError(t, err) - require.NoError(t, ioutil.WriteFile(manifestPath+backupManifestChecksumSuffix, checksum, 0644 /* perm */)) + require.NoError(t, ioutil.WriteFile(manifestPath+backupinfo.BackupManifestChecksumSuffix, checksum, 0644 /* perm */)) // Prove we can restore again. sqlDB.Exec(t, `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`) @@ -8079,9 +8079,9 @@ func TestManifestTooNew(t *testing.T) { require.NoError(t, err) require.NoError(t, ioutil.WriteFile(manifestPath, manifestData, 0644 /* perm */)) // Also write the checksum file to match the new manifest. - checksum, err = getChecksum(manifestData) + checksum, err = backupinfo.GetChecksum(manifestData) require.NoError(t, err) - require.NoError(t, ioutil.WriteFile(manifestPath+backupManifestChecksumSuffix, checksum, 0644 /* perm */)) + require.NoError(t, ioutil.WriteFile(manifestPath+backupinfo.BackupManifestChecksumSuffix, checksum, 0644 /* perm */)) // Prove we can restore again. sqlDB.Exec(t, `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`) sqlDB.Exec(t, `DROP DATABASE r1`) @@ -9787,8 +9787,8 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) { require.NoError(t, store.List(ctx, latestFilePath+"/progress/", "", func(f string) error { // Don't double count checkpoints as there will be the manifest and // the checksum. - if !strings.HasSuffix(f, backupManifestChecksumSuffix) { - if strings.HasPrefix(f, backupManifestCheckpointName) { + if !strings.HasSuffix(f, backupinfo.BackupManifestChecksumSuffix) { + if strings.HasPrefix(f, backupinfo.BackupManifestCheckpointName) { actualNumCheckpointsWritten++ } } @@ -9814,7 +9814,7 @@ func TestBackupTimestampedCheckpointsAreLexicographical(t *testing.T) { numCheckpoints := 5 for i := 0; i < numCheckpoints; i++ { - checkpoints = append(checkpoints, newTimestampedCheckpointFileName()) + checkpoints = append(checkpoints, backupinfo.NewTimestampedCheckpointFileName()) // Occasionally, we call newTimestampedCheckpointFileName() in succession // too fast and the timestamp is the same. So wait for a moment to // avoid that. @@ -9914,7 +9914,7 @@ func TestBackupTimestampedCheckpointsAreLexicographical(t *testing.T) { require.NoError(t, err) for _, checkpoint := range checkpoints { var desc []byte - require.NoError(t, cloud.WriteFile(ctx, store, backupProgressDirectory+"/"+checkpoint, bytes.NewReader(desc))) + require.NoError(t, cloud.WriteFile(ctx, store, backupinfo.BackupProgressDirectory+"/"+checkpoint, bytes.NewReader(desc))) } require.NoError(t, err) var actual string diff --git a/pkg/ccl/backupccl/backupbase/constants.go b/pkg/ccl/backupccl/backupbase/constants.go index ff5bf49e43a4..ff1890d47c75 100644 --- a/pkg/ccl/backupccl/backupbase/constants.go +++ b/pkg/ccl/backupccl/backupbase/constants.go @@ -8,13 +8,8 @@ package backupbase +// TODO(adityamaru): Move constants to relevant backupccl packages. const ( - // IncludeManifest is a named const that can be passed to FindPriorBackups. - IncludeManifest = true - - // OmitManifest is a named const that can be passed to FindPriorBackups. - OmitManifest = false - // LatestFileName is the name of a file in the collection which contains the // path of the most recently taken full backup in the backup collection. LatestFileName = "LATEST" diff --git a/pkg/ccl/backupccl/backupdest/BUILD.bazel b/pkg/ccl/backupccl/backupdest/BUILD.bazel index 1653cb072fc1..484394f20385 100644 --- a/pkg/ccl/backupccl/backupdest/BUILD.bazel +++ b/pkg/ccl/backupccl/backupdest/BUILD.bazel @@ -10,6 +10,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/backupccl/backupbase", + "//pkg/ccl/backupccl/backupinfo", + "//pkg/ccl/backupccl/backuppb", "//pkg/ccl/backupccl/backuputils", "//pkg/cloud", "//pkg/clusterversion", @@ -24,6 +26,7 @@ go_library( "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/ioctx", + "//pkg/util/mon", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/ccl/backupccl/backupdest/backup_destination.go b/pkg/ccl/backupccl/backupdest/backup_destination.go index b3f27a4d40d9..cefb4f6050b3 100644 --- a/pkg/ccl/backupccl/backupdest/backup_destination.go +++ b/pkg/ccl/backupccl/backupdest/backup_destination.go @@ -13,10 +13,13 @@ import ( "encoding/hex" "fmt" "net/url" + "path" "regexp" "strings" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -31,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -42,6 +46,10 @@ const ( // DefaultLocalityValue is the default locality tag used in a locality aware // backup/restore when an explicit COCKROACH_LOCALITY is not specified. DefaultLocalityValue = "default" + // includeManifest is a named const that can be passed to FindPriorBackups. + includeManifest = true + // OmitManifest is a named const that can be passed to FindPriorBackups. + OmitManifest = false ) // On some cloud storage platforms (i.e. GS, S3), backups in a base bucket may @@ -205,7 +213,7 @@ func ResolveDest( } defer incrementalStore.Close() - priors, err := FindPriorBackups(ctx, incrementalStore, backupbase.OmitManifest) + priors, err := FindPriorBackups(ctx, incrementalStore, OmitManifest) if err != nil { return "", "", "", nil, nil, errors.Wrap(err, "adjusting backup destination to append new layer to existing backup") } @@ -466,3 +474,203 @@ func ListFullBackupsInCollection( } return backupPaths, nil } + +// ResolveBackupManifests resolves the URIs that point to the incremental layers +// (each of which can be partitioned) of backups into the actual backup +// manifests and metadata required to RESTORE. If only one layer is explicitly +// provided, it is inspected to see if it contains "appended" layers internally +// that are then expanded into the result layers returned, similar to if those +// layers had been specified in `from` explicitly. +func ResolveBackupManifests( + ctx context.Context, + mem *mon.BoundAccount, + baseStores []cloud.ExternalStorage, + mkStore cloud.ExternalStorageFromURIFactory, + fullyResolvedBaseDirectory []string, + fullyResolvedIncrementalsDirectory []string, + endTime hlc.Timestamp, + encryption *jobspb.BackupEncryptionOptions, + user username.SQLUsername, +) ( + defaultURIs []string, + // mainBackupManifests contains the manifest located at each defaultURI in the backup chain. + mainBackupManifests []backuppb.BackupManifest, + localityInfo []jobspb.RestoreDetails_BackupLocalityInfo, + reservedMemSize int64, + _ error, +) { + var ownedMemSize int64 + defer func() { + if ownedMemSize != 0 { + mem.Shrink(ctx, ownedMemSize) + } + }() + baseManifest, memSize, err := backupinfo.ReadBackupManifestFromStore(ctx, mem, baseStores[0], encryption) + if err != nil { + return nil, nil, nil, 0, err + } + ownedMemSize += memSize + + incStores := make([]cloud.ExternalStorage, len(fullyResolvedIncrementalsDirectory)) + for i := range fullyResolvedIncrementalsDirectory { + store, err := mkStore(ctx, fullyResolvedIncrementalsDirectory[i], user) + if err != nil { + return nil, nil, nil, 0, errors.Wrapf(err, "failed to open backup storage location") + } + defer store.Close() + incStores[i] = store + } + + var prev []string + if len(incStores) > 0 { + prev, err = FindPriorBackups(ctx, incStores[0], includeManifest) + if err != nil { + return nil, nil, nil, 0, err + } + } + numLayers := len(prev) + 1 + + defaultURIs = make([]string, numLayers) + mainBackupManifests = make([]backuppb.BackupManifest, numLayers) + localityInfo = make([]jobspb.RestoreDetails_BackupLocalityInfo, numLayers) + + // Setup the full backup layer explicitly. + defaultURIs[0] = fullyResolvedBaseDirectory[0] + mainBackupManifests[0] = baseManifest + localityInfo[0], err = backupinfo.GetLocalityInfo( + ctx, baseStores, fullyResolvedBaseDirectory, baseManifest, encryption, "", /* prefix */ + ) + if err != nil { + return nil, nil, nil, 0, err + } + + // If we discovered additional layers, handle them too. + if numLayers > 1 { + numPartitions := len(fullyResolvedIncrementalsDirectory) + // We need the parsed base URI (/) for each partition to calculate the + // URI to each layer in that partition below. + baseURIs := make([]*url.URL, numPartitions) + for i := range fullyResolvedIncrementalsDirectory { + baseURIs[i], err = url.Parse(fullyResolvedIncrementalsDirectory[i]) + if err != nil { + return nil, nil, nil, 0, err + } + } + + // For each layer, we need to load the default manifest then calculate the URI and the + // locality info for each partition. + for i := range prev { + defaultManifestForLayer, memSize, err := backupinfo.ReadBackupManifest(ctx, mem, incStores[0], prev[i], encryption) + if err != nil { + return nil, nil, nil, 0, err + } + ownedMemSize += memSize + mainBackupManifests[i+1] = defaultManifestForLayer + + // prev[i] is the path to the manifest file itself for layer i -- the + // dirname piece of that path is the subdirectory in each of the + // partitions in which we'll also expect to find a partition manifest. + // Recall full inc URI is // + incSubDir := path.Dir(prev[i]) + partitionURIs := make([]string, numPartitions) + for j := range baseURIs { + u := *baseURIs[j] // NB: makes a copy to avoid mutating the baseURI. + u.Path = backuputils.JoinURLPath(u.Path, incSubDir) + partitionURIs[j] = u.String() + } + defaultURIs[i+1] = partitionURIs[0] + localityInfo[i+1], err = backupinfo.GetLocalityInfo(ctx, incStores, partitionURIs, defaultManifestForLayer, encryption, incSubDir) + if err != nil { + return nil, nil, nil, 0, err + } + } + } + + totalMemSize := ownedMemSize + ownedMemSize = 0 + + validatedDefaultURIs, validatedMainBackupManifests, validatedLocalityInfo, err := backupinfo.ValidateEndTimeAndTruncate( + defaultURIs, mainBackupManifests, localityInfo, endTime) + + if err != nil { + return nil, nil, nil, 0, err + } + return validatedDefaultURIs, validatedMainBackupManifests, validatedLocalityInfo, totalMemSize, nil +} + +// DeprecatedResolveBackupManifestsExplicitIncrementals reads the +// BACKUP_MANIFEST files from the incremental backup locations that have been +// explicitly provided by the user in `from`. The method uses the manifest file +// to return the defaultURI, backup manifest, and locality info for each +// incremental layer. +func DeprecatedResolveBackupManifestsExplicitIncrementals( + ctx context.Context, + mem *mon.BoundAccount, + mkStore cloud.ExternalStorageFromURIFactory, + from [][]string, + endTime hlc.Timestamp, + encryption *jobspb.BackupEncryptionOptions, + user username.SQLUsername, +) ( + defaultURIs []string, + // mainBackupManifests contains the manifest located at each defaultURI in the backup chain. + mainBackupManifests []backuppb.BackupManifest, + localityInfo []jobspb.RestoreDetails_BackupLocalityInfo, + reservedMemSize int64, + _ error, +) { + // If explicit incremental backups were are passed, we simply load them one + // by one as specified and return the results. + var ownedMemSize int64 + defer func() { + if ownedMemSize != 0 { + mem.Shrink(ctx, ownedMemSize) + } + }() + + defaultURIs = make([]string, len(from)) + localityInfo = make([]jobspb.RestoreDetails_BackupLocalityInfo, len(from)) + mainBackupManifests = make([]backuppb.BackupManifest, len(from)) + + var err error + for i, uris := range from { + // The first URI in the list must contain the main BACKUP manifest. + defaultURIs[i] = uris[0] + + stores := make([]cloud.ExternalStorage, len(uris)) + for j := range uris { + stores[j], err = mkStore(ctx, uris[j], user) + if err != nil { + return nil, nil, nil, 0, errors.Wrapf(err, "export configuration") + } + defer stores[j].Close() + } + + var memSize int64 + mainBackupManifests[i], memSize, err = backupinfo.ReadBackupManifestFromStore(ctx, mem, stores[0], encryption) + if err != nil { + return nil, nil, nil, 0, err + } + ownedMemSize += memSize + + if len(uris) > 1 { + localityInfo[i], err = backupinfo.GetLocalityInfo( + ctx, stores, uris, mainBackupManifests[i], encryption, "", /* prefix */ + ) + if err != nil { + return nil, nil, nil, 0, err + } + } + } + + totalMemSize := ownedMemSize + ownedMemSize = 0 + + validatedDefaultURIs, validatedMainBackupManifests, validatedLocalityInfo, err := + backupinfo.ValidateEndTimeAndTruncate(defaultURIs, mainBackupManifests, localityInfo, endTime) + + if err != nil { + return nil, nil, nil, 0, err + } + return validatedDefaultURIs, validatedMainBackupManifests, validatedLocalityInfo, totalMemSize, nil +} diff --git a/pkg/ccl/backupccl/backupdest/incrementals_test.go b/pkg/ccl/backupccl/backupdest/incrementals_test.go index 775c75eec6b3..6c9a1f00987d 100644 --- a/pkg/ccl/backupccl/backupdest/incrementals_test.go +++ b/pkg/ccl/backupccl/backupdest/incrementals_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package backupdest +package backupdest_test import ( "testing" diff --git a/pkg/ccl/backupccl/backupencryption/encryption.go b/pkg/ccl/backupccl/backupencryption/encryption.go index 86d2973c23ef..d08e2c434293 100644 --- a/pkg/ccl/backupccl/backupencryption/encryption.go +++ b/pkg/ccl/backupccl/backupencryption/encryption.go @@ -32,6 +32,13 @@ const ( // backupEncryptionInfoFile is the file name used to store the serialized // EncryptionInfo proto while the backup is in progress. backupEncryptionInfoFile = "ENCRYPTION-INFO" + + // BackupOptEncKMS is the option name in a BACKUP statement to specify a KMS + // URI for encryption. + BackupOptEncKMS = "kms" + // BackupOptEncPassphrase is the option name in a BACKUP statement to specify + // a passphrase for encryption. + BackupOptEncPassphrase = "encryption_passphrase" ) // ErrEncryptionInfoRead is a special error returned when the ENCRYPTION-INFO diff --git a/pkg/ccl/backupccl/backupinfo/BUILD.bazel b/pkg/ccl/backupccl/backupinfo/BUILD.bazel new file mode 100644 index 000000000000..ddde5478b66d --- /dev/null +++ b/pkg/ccl/backupccl/backupinfo/BUILD.bazel @@ -0,0 +1,87 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("//pkg/testutils/buildutil:buildutil.bzl", "disallowed_imports_test") + +go_library( + name = "backupinfo", + srcs = [ + "backup_metadata.go", + "manifest_handling.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ccl/backupccl/backupbase", + "//pkg/ccl/backupccl/backupencryption", + "//pkg/ccl/backupccl/backuppb", + "//pkg/ccl/backupccl/backuputils", + "//pkg/ccl/storageccl", + "//pkg/cloud", + "//pkg/clusterversion", + "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/security/username", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descbuilder", + "//pkg/sql/catalog/descpb", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/protoreflect", + "//pkg/sql/sem/tree", + "//pkg/sql/stats", + "//pkg/storage", + "//pkg/util/ctxgroup", + "//pkg/util/encoding", + "//pkg/util/hlc", + "//pkg/util/ioctx", + "//pkg/util/json", + "//pkg/util/mon", + "//pkg/util/protoutil", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "backupinfo_test", + srcs = [ + "backup_metadata_test.go", + "main_test.go", + ], + embed = [":backupinfo"], + deps = [ + "//pkg/base", + "//pkg/blobs", + "//pkg/ccl/backupccl/backupbase", + "//pkg/ccl/backupccl/backuppb", + "//pkg/ccl/backupccl/backuputils", + "//pkg/ccl/utilccl", + "//pkg/cloud", + "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/security/username", + "//pkg/server", + "//pkg/sql", + "//pkg/sql/catalog/descpb", + "//pkg/sql/stats", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/ioctx", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/protoutil", + "//pkg/util/randutil", + "@com_github_stretchr_testify//require", + ], +) + +disallowed_imports_test( + "backupinfo", + ["//pkg/ccl/backupccl/backupdest"], +) diff --git a/pkg/ccl/backupccl/backup_metadata.go b/pkg/ccl/backupccl/backupinfo/backup_metadata.go similarity index 98% rename from pkg/ccl/backupccl/backup_metadata.go rename to pkg/ccl/backupccl/backupinfo/backup_metadata.go index b03ab6e96c83..ac074f8e023c 100644 --- a/pkg/ccl/backupccl/backup_metadata.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package backupccl +package backupinfo import ( "bytes" @@ -35,8 +35,11 @@ import ( ) const ( - metadataSSTName = "metadata.sst" - fileInfoPath = "fileinfo.sst" + // MetadataSSTName is the name of the SST file containing the backup metadata. + MetadataSSTName = "metadata.sst" + // FileInfoPath is the name of the SST file containing the + // BackupManifest_Files of the backup. + FileInfoPath = "fileinfo.sst" sstBackupKey = "backup" sstDescsPrefix = "desc/" sstFilesPrefix = "file/" @@ -46,7 +49,10 @@ const ( sstTenantsPrefix = "tenant/" ) -func writeBackupMetadataSST( +// WriteBackupMetadataSST is responsible for constructing and writing the +// `metadata.sst` to dest. This file contains the metadata corresponding to this +// backup. +func WriteBackupMetadataSST( ctx context.Context, dest cloud.ExternalStorage, enc *jobspb.BackupEncryptionOptions, @@ -62,7 +68,7 @@ func writeBackupMetadataSST( } }() - w, err := makeWriter(ctx, dest, metadataSSTName, enc) + w, err := makeWriter(ctx, dest, MetadataSSTName, enc) if err != nil { return err } @@ -125,7 +131,7 @@ func constructMetadataSST( return err } - if err := writeFilesToMetadata(ctx, sst, m, dest, enc, fileInfoPath); err != nil { + if err := writeFilesToMetadata(ctx, sst, m, dest, enc, FileInfoPath); err != nil { return err } @@ -783,7 +789,8 @@ type BackupMetadata struct { filename string } -func newBackupMetadata( +// NewBackupMetadata returns a new BackupMetadata instance. +func NewBackupMetadata( ctx context.Context, exportStore cloud.ExternalStorage, sstFileName string, diff --git a/pkg/ccl/backupccl/backup_metadata_test.go b/pkg/ccl/backupccl/backupinfo/backup_metadata_test.go similarity index 87% rename from pkg/ccl/backupccl/backup_metadata_test.go rename to pkg/ccl/backupccl/backupinfo/backup_metadata_test.go index b86715328244..c3d86ee7b2f1 100644 --- a/pkg/ccl/backupccl/backup_metadata_test.go +++ b/pkg/ccl/backupccl/backupinfo/backup_metadata_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package backupccl +package backupinfo_test import ( "context" @@ -16,7 +16,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -39,7 +41,8 @@ func TestMetadataSST(t *testing.T) { ctx := context.Background() const numAccounts = 1 userfile := "userfile:///0" - tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication) + tc, sqlDB, _, cleanupFn := backuputils.BackupRestoreTestSetup(t, backuputils.SingleNode, numAccounts, + backuputils.InitManualReplication) defer cleanupFn() // Check that backup metadata is correct on full cluster backup. @@ -86,7 +89,7 @@ func checkMetadata( t.Fatal(err) } - bm, err := newBackupMetadata(ctx, store, metadataSSTName, nil) + bm, err := backupinfo.NewBackupMetadata(ctx, store, backupinfo.MetadataSSTName, nil) if err != nil { t.Fatal(err) } @@ -110,7 +113,7 @@ func checkMetadata( checkStats(ctx, t, store, m, bm) } -func checkManifest(t *testing.T, m *backuppb.BackupManifest, bm *BackupMetadata) { +func checkManifest(t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata) { expectedManifest := *m expectedManifest.Descriptors = nil expectedManifest.DescriptorChanges = nil @@ -124,7 +127,7 @@ func checkManifest(t *testing.T, m *backuppb.BackupManifest, bm *BackupMetadata) } func checkDescriptors( - ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *BackupMetadata, + ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaDescs []descpb.Descriptor var desc descpb.Descriptor @@ -143,7 +146,7 @@ func checkDescriptors( } func checkDescriptorChanges( - ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *BackupMetadata, + ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaRevs []backuppb.BackupManifest_DescriptorRevision var rev backuppb.BackupManifest_DescriptorRevision @@ -165,7 +168,9 @@ func checkDescriptorChanges( require.Equal(t, m.DescriptorChanges, metaRevs) } -func checkFiles(ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *BackupMetadata) { +func checkFiles( + ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, +) { var metaFiles []backuppb.BackupManifest_File var file backuppb.BackupManifest_File it := bm.FileIter(ctx) @@ -181,7 +186,9 @@ func checkFiles(ctx context.Context, t *testing.T, m *backuppb.BackupManifest, b require.Equal(t, m.Files, metaFiles) } -func checkSpans(ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *BackupMetadata) { +func checkSpans( + ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, +) { var metaSpans []roachpb.Span var span roachpb.Span it := bm.SpanIter(ctx) @@ -198,7 +205,7 @@ func checkSpans(ctx context.Context, t *testing.T, m *backuppb.BackupManifest, b } func checkIntroducedSpans( - ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *BackupMetadata, + ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaSpans []roachpb.Span var span roachpb.Span @@ -215,7 +222,7 @@ func checkIntroducedSpans( } func checkTenants( - ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *BackupMetadata, + ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata, ) { var metaTenants []descpb.TenantInfoWithUsage var tenant descpb.TenantInfoWithUsage @@ -237,9 +244,9 @@ func checkStats( t *testing.T, store cloud.ExternalStorage, m *backuppb.BackupManifest, - bm *BackupMetadata, + bm *backupinfo.BackupMetadata, ) { - expectedStats, err := getStatisticsFromBackup(ctx, store, nil, *m) + expectedStats, err := backupinfo.GetStatisticsFromBackup(ctx, store, nil, *m) if err != nil { t.Fatal(err) } @@ -271,8 +278,8 @@ func testingReadBackupManifest( if err != nil { return nil, err } - if isGZipped(bytes) { - descBytes, err := decompressData(ctx, nil, bytes) + if backupinfo.IsGZipped(bytes) { + descBytes, err := backupinfo.DecompressData(ctx, nil, bytes) if err != nil { return nil, err } diff --git a/pkg/ccl/backupccl/backupinfo/main_test.go b/pkg/ccl/backupccl/backupinfo/main_test.go new file mode 100644 index 000000000000..2ed06175c9c9 --- /dev/null +++ b/pkg/ccl/backupccl/backupinfo/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupinfo + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer utilccl.TestingEnableEnterprise()() + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/ccl/backupccl/manifest_handling.go b/pkg/ccl/backupccl/backupinfo/manifest_handling.go similarity index 75% rename from pkg/ccl/backupccl/manifest_handling.go rename to pkg/ccl/backupccl/backupinfo/manifest_handling.go index 8847b917104d..0562bac3ea3a 100644 --- a/pkg/ccl/backupccl/manifest_handling.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package backupccl +package backupinfo import ( "bytes" @@ -15,29 +15,30 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "net/url" "path" "sort" "strings" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" - "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupdest" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" descpb "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -51,40 +52,38 @@ import ( // Files that may appear in a backup directory. const ( - // backupManifestChecksumSuffix indicates where the checksum for the manifest + // BackupManifestChecksumSuffix indicates where the checksum for the manifest // is stored if present. It can be found in the name of the backup manifest + // this suffix. - backupManifestChecksumSuffix = "-CHECKSUM" + BackupManifestChecksumSuffix = "-CHECKSUM" - // backupPartitionDescriptorPrefix is the file name prefix for serialized - // BackupPartitionDescriptor protos. - backupPartitionDescriptorPrefix = "BACKUP_PART" - // backupManifestCheckpointName is the file name used to store the serialized + // BackupManifestCheckpointName is the file name used to store the serialized // BackupManifest proto while the backup is in progress. - backupManifestCheckpointName = "BACKUP-CHECKPOINT" - // backupStatisticsFileName is the file name used to store the serialized + BackupManifestCheckpointName = "BACKUP-CHECKPOINT" + + // BackupStatisticsFileName is the file name used to store the serialized // table statistics for the tables being backed up. - backupStatisticsFileName = "BACKUP-STATISTICS" -) + BackupStatisticsFileName = "BACKUP-STATISTICS" -const ( // BackupFormatDescriptorTrackingVersion added tracking of complete DBs. BackupFormatDescriptorTrackingVersion uint32 = 1 - // backupProgressDirectory is the directory where all 22.1 and beyond + // BackupProgressDirectory is the directory where all 22.1 and beyond // CHECKPOINT files will be stored as we no longer want to overwrite // them. - backupProgressDirectory = "progress" + BackupProgressDirectory = "progress" ) -var writeMetadataSST = settings.RegisterBoolSetting( +// WriteMetadataSST controls if we write the experimental new format BACKUP +// metadata file. +var WriteMetadataSST = settings.RegisterBoolSetting( settings.TenantWritable, "kv.bulkio.write_metadata_sst.enabled", "write experimental new format BACKUP metadata file", true, ) -// isGZipped detects whether the given bytes represent GZipped data. This check +// IsGZipped detects whether the given bytes represent GZipped data. This check // is used rather than a standard implementation such as http.DetectContentType // since some zipped data may be mis-identified by that method. We've seen // gzipped data incorrectly identified as "application/vnd.ms-fontobject". The @@ -93,7 +92,7 @@ var writeMetadataSST = settings.RegisterBoolSetting( // // This method is only used to detect if protobufs are GZipped, and there are no // conflicts between the starting bytes of a protobuf and these magic bytes. -func isGZipped(dat []byte) bool { +func IsGZipped(dat []byte) bool { gzipPrefix := []byte("\x1F\x8B\x08") return bytes.HasPrefix(dat, gzipPrefix) } @@ -138,10 +137,10 @@ func ReadBackupManifestFromStore( exportStore cloud.ExternalStorage, encryption *jobspb.BackupEncryptionOptions, ) (backuppb.BackupManifest, int64, error) { - backupManifest, memSize, err := readBackupManifest(ctx, mem, exportStore, backupbase.BackupManifestName, + backupManifest, memSize, err := ReadBackupManifest(ctx, mem, exportStore, backupbase.BackupManifestName, encryption) if err != nil { - oldManifest, newMemSize, newErr := readBackupManifest(ctx, mem, exportStore, backupbase.BackupOldManifestName, + oldManifest, newMemSize, newErr := ReadBackupManifest(ctx, mem, exportStore, backupbase.BackupOldManifestName, encryption) if newErr != nil { return backuppb.BackupManifest{}, 0, err @@ -169,9 +168,8 @@ func compressData(descBuf []byte) ([]byte, error) { return gzipBuf.Bytes(), nil } -// decompressData decompresses gzip data buffer and -// returns decompressed bytes. -func decompressData(ctx context.Context, mem *mon.BoundAccount, descBytes []byte) ([]byte, error) { +// DecompressData decompresses gzip data buffer and returns decompressed bytes. +func DecompressData(ctx context.Context, mem *mon.BoundAccount, descBytes []byte) ([]byte, error) { r, err := gzip.NewReader(bytes.NewBuffer(descBytes)) if err != nil { return nil, err @@ -180,9 +178,9 @@ func decompressData(ctx context.Context, mem *mon.BoundAccount, descBytes []byte return mon.ReadAll(ctx, ioctx.ReaderAdapter(r), mem) } -// readBackupCheckpointManifest reads and unmarshals a BACKUP-CHECKPOINT +// ReadBackupCheckpointManifest reads and unmarshals a BACKUP-CHECKPOINT // manifest from filename in the provided export store. -func readBackupCheckpointManifest( +func ReadBackupCheckpointManifest( ctx context.Context, mem *mon.BoundAccount, exportStore cloud.ExternalStorage, @@ -197,7 +195,7 @@ func readBackupCheckpointManifest( // Look for a checksum, if one is not found it could be an older backup, // but we want to continue anyway. - checksumFile, err := readLatestCheckpointFile(ctx, exportStore, filename+backupManifestChecksumSuffix) + checksumFile, err := readLatestCheckpointFile(ctx, exportStore, filename+BackupManifestChecksumSuffix) if err != nil { if !errors.Is(err, cloud.ErrFileDoesNotExist) { return backuppb.BackupManifest{}, 0, err @@ -209,9 +207,9 @@ func readBackupCheckpointManifest( return readManifest(ctx, mem, exportStore, encryption, checkpointFile, checksumFile) } -// readBackupManifest reads and unmarshals a BackupManifest from filename in the +// ReadBackupManifest reads and unmarshals a BackupManifest from filename in the // provided export store. -func readBackupManifest( +func ReadBackupManifest( ctx context.Context, mem *mon.BoundAccount, exportStore cloud.ExternalStorage, @@ -226,7 +224,7 @@ func readBackupManifest( // Look for a checksum, if one is not found it could be an older backup, // but we want to continue anyway. - checksumFile, err := exportStore.ReadFile(ctx, filename+backupManifestChecksumSuffix) + checksumFile, err := exportStore.ReadFile(ctx, filename+BackupManifestChecksumSuffix) if err != nil { if !errors.Is(err, cloud.ErrFileDoesNotExist) { return backuppb.BackupManifest{}, 0, err @@ -265,7 +263,7 @@ func readManifest( if err != nil { return backuppb.BackupManifest{}, 0, errors.Wrap(err, "reading checksum file") } - checksum, err := getChecksum(descBytes) + checksum, err := GetChecksum(descBytes) if err != nil { return backuppb.BackupManifest{}, 0, errors.Wrap(err, "calculating checksum of manifest") } @@ -290,8 +288,8 @@ func readManifest( descBytes = plaintextBytes } - if isGZipped(descBytes) { - decompressedBytes, err := decompressData(ctx, mem, descBytes) + if IsGZipped(descBytes) { + decompressedBytes, err := DecompressData(ctx, mem, descBytes) if err != nil { return backuppb.BackupManifest{}, 0, errors.Wrap( err, "decompressing backup manifest") @@ -314,7 +312,7 @@ func readManifest( if encryption == nil && storageccl.AppearsEncrypted(descBytes) { return backuppb.BackupManifest{}, 0, errors.Wrapf( err, "file appears encrypted -- try specifying one of \"%s\" or \"%s\"", - backupOptEncPassphrase, backupOptEncKMS) + backupencryption.BackupOptEncPassphrase, backupencryption.BackupOptEncKMS) } return backuppb.BackupManifest{}, 0, err } @@ -374,8 +372,8 @@ func readBackupPartitionDescriptor( descBytes = plaintextData } - if isGZipped(descBytes) { - decompressedData, err := decompressData(ctx, mem, descBytes) + if IsGZipped(descBytes) { + decompressedData, err := DecompressData(ctx, mem, descBytes) if err != nil { return backuppb.BackupPartitionDescriptor{}, 0, errors.Wrap( err, "decompressing backup partition descriptor") @@ -434,7 +432,39 @@ func readTableStatistics( return &tableStats, err } -func writeBackupManifest( +// GetStatisticsFromBackup retrieves Statistics from backup manifest, +// either through the Statistics field or from the files. +func GetStatisticsFromBackup( + ctx context.Context, + exportStore cloud.ExternalStorage, + encryption *jobspb.BackupEncryptionOptions, + backup backuppb.BackupManifest, +) ([]*stats.TableStatisticProto, error) { + // This part deals with pre-20.2 stats format where backup statistics + // are stored as a field in backup manifests instead of in their + // individual files. + if backup.DeprecatedStatistics != nil { + return backup.DeprecatedStatistics, nil + } + tableStatistics := make([]*stats.TableStatisticProto, 0, len(backup.StatisticsFilenames)) + uniqueFileNames := make(map[string]struct{}) + for _, fname := range backup.StatisticsFilenames { + if _, exists := uniqueFileNames[fname]; !exists { + uniqueFileNames[fname] = struct{}{} + myStatsTable, err := readTableStatistics(ctx, exportStore, fname, encryption) + if err != nil { + return tableStatistics, err + } + tableStatistics = append(tableStatistics, myStatsTable.Statistics...) + } + } + + return tableStatistics, nil +} + +// WriteBackupManifest compresses and writes the passed in BackupManifest `desc` +// to `exportStore`. +func WriteBackupManifest( ctx context.Context, settings *cluster.Settings, exportStore cloud.ExternalStorage, @@ -470,20 +500,20 @@ func writeBackupManifest( } // Write the checksum file after we've successfully wrote the manifest. - checksum, err := getChecksum(descBuf) + checksum, err := GetChecksum(descBuf) if err != nil { return errors.Wrap(err, "calculating checksum") } - if err := cloud.WriteFile(ctx, exportStore, filename+backupManifestChecksumSuffix, bytes.NewReader(checksum)); err != nil { + if err := cloud.WriteFile(ctx, exportStore, filename+BackupManifestChecksumSuffix, bytes.NewReader(checksum)); err != nil { return errors.Wrap(err, "writing manifest checksum") } return nil } -// getChecksum returns a 32 bit keyed-checksum for the given data. -func getChecksum(data []byte) ([]byte, error) { +// GetChecksum returns a 32 bit keyed-checksum for the given data. +func GetChecksum(data []byte) ([]byte, error) { const checksumSizeBytes = 4 hash := sha256.New() if _, err := hash.Write(data); err != nil { @@ -493,10 +523,10 @@ func getChecksum(data []byte) ([]byte, error) { return hash.Sum(nil)[:checksumSizeBytes], nil } -// writeBackupPartitionDescriptor writes metadata (containing a locality KV and +// WriteBackupPartitionDescriptor writes metadata (containing a locality KV and // partial file listing) for a partitioned BACKUP to one of the stores in the // backup. -func writeBackupPartitionDescriptor( +func WriteBackupPartitionDescriptor( ctx context.Context, exportStore cloud.ExternalStorage, filename string, @@ -526,13 +556,12 @@ func writeBackupPartitionDescriptor( return cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(descBuf)) } -// writeTableStatistics writes a StatsTable object to a file of the filename +// WriteTableStatistics writes a StatsTable object to a file of the filename // to the specified exportStore. It will be encrypted according to the encryption // option given. -func writeTableStatistics( +func WriteTableStatistics( ctx context.Context, exportStore cloud.ExternalStorage, - filename string, encryption *jobspb.BackupEncryptionOptions, stats *backuppb.StatsTable, ) error { @@ -551,10 +580,15 @@ func writeTableStatistics( return err } } - return cloud.WriteFile(ctx, exportStore, filename, bytes.NewReader(statsBuf)) + return cloud.WriteFile(ctx, exportStore, BackupStatisticsFileName, bytes.NewReader(statsBuf)) } -func loadBackupManifests( +// LoadBackupManifests reads and returns the BackupManifests at the +// ExternalStorage locations in `uris`. +// +// The caller is responsible for shrinking `mem` by the returned size once they +// are done with the returned manifests. +func LoadBackupManifests( ctx context.Context, mem *mon.BoundAccount, uris []string, @@ -587,12 +621,14 @@ func loadBackupManifests( return backupManifests, memSize, nil } -var errLocalityDescriptor = errors.New(`Locality Descriptor not found`) +// ErrLocalityDescriptor is the sentinel error that is thrown when a locality +// descriptor is not found. +var ErrLocalityDescriptor = errors.New(`Locality Descriptor not found`) -// getLocalityInfo takes a list of stores and their URIs, along with the main +// GetLocalityInfo takes a list of stores and their URIs, along with the main // backup manifest searches each for the locality pieces listed in the the // main manifest, returning the mapping. -func getLocalityInfo( +func GetLocalityInfo( ctx context.Context, stores []cloud.ExternalStorage, uris []string, @@ -640,217 +676,24 @@ func getLocalityInfo( } } if !found { - return info, errors.Mark(errors.Newf("expected manifest %s not found in backup locations", filename), errLocalityDescriptor) + return info, errors.Mark(errors.Newf("expected manifest %s not found in backup locations", filename), ErrLocalityDescriptor) } } info.URIsByOriginalLocalityKV = urisByOrigLocality return info, nil } -func resolveBackupManifestsExplicitIncrementals( - ctx context.Context, - mem *mon.BoundAccount, - mkStore cloud.ExternalStorageFromURIFactory, - from [][]string, - endTime hlc.Timestamp, - encryption *jobspb.BackupEncryptionOptions, - user username.SQLUsername, -) ( - defaultURIs []string, - // mainBackupManifests contains the manifest located at each defaultURI in the backup chain. - mainBackupManifests []backuppb.BackupManifest, - localityInfo []jobspb.RestoreDetails_BackupLocalityInfo, - reservedMemSize int64, - _ error, -) { - // If explicit incremental backups were are passed, we simply load them one - // by one as specified and return the results. - var ownedMemSize int64 - defer func() { - if ownedMemSize != 0 { - mem.Shrink(ctx, ownedMemSize) - } - }() - - defaultURIs = make([]string, len(from)) - localityInfo = make([]jobspb.RestoreDetails_BackupLocalityInfo, len(from)) - mainBackupManifests = make([]backuppb.BackupManifest, len(from)) - - var err error - for i, uris := range from { - // The first URI in the list must contain the main BACKUP manifest. - defaultURIs[i] = uris[0] - - stores := make([]cloud.ExternalStorage, len(uris)) - for j := range uris { - stores[j], err = mkStore(ctx, uris[j], user) - if err != nil { - return nil, nil, nil, 0, errors.Wrapf(err, "export configuration") - } - defer stores[j].Close() - } - - var memSize int64 - mainBackupManifests[i], memSize, err = ReadBackupManifestFromStore(ctx, mem, stores[0], encryption) - if err != nil { - return nil, nil, nil, 0, err - } - ownedMemSize += memSize - - if len(uris) > 1 { - localityInfo[i], err = getLocalityInfo( - ctx, stores, uris, mainBackupManifests[i], encryption, "", /* prefix */ - ) - if err != nil { - return nil, nil, nil, 0, err - } - } - } - - totalMemSize := ownedMemSize - ownedMemSize = 0 - - validatedDefaultURIs, validatedMainBackupManifests, validatedLocalityInfo, err := validateEndTimeAndTruncate( - defaultURIs, mainBackupManifests, localityInfo, endTime) - - if err != nil { - return nil, nil, nil, 0, err - } - return validatedDefaultURIs, validatedMainBackupManifests, validatedLocalityInfo, totalMemSize, nil -} - -// resolveBackupManifests resolves the URIs that point to the incremental layers -// (each of which can be partitioned) of backups into the actual backup -// manifests and metadata required to RESTORE. If only one layer is explicitly -// provided, it is inspected to see if it contains "appended" layers internally -// that are then expanded into the result layers returned, similar to if those -// layers had been specified in `from` explicitly. -func resolveBackupManifests( - ctx context.Context, - mem *mon.BoundAccount, - baseStores []cloud.ExternalStorage, - mkStore cloud.ExternalStorageFromURIFactory, - fullyResolvedBaseDirectory []string, - fullyResolvedIncrementalsDirectory []string, - endTime hlc.Timestamp, - encryption *jobspb.BackupEncryptionOptions, - user username.SQLUsername, -) ( - defaultURIs []string, - // mainBackupManifests contains the manifest located at each defaultURI in the backup chain. - mainBackupManifests []backuppb.BackupManifest, - localityInfo []jobspb.RestoreDetails_BackupLocalityInfo, - reservedMemSize int64, - _ error, -) { - var ownedMemSize int64 - defer func() { - if ownedMemSize != 0 { - mem.Shrink(ctx, ownedMemSize) - } - }() - baseManifest, memSize, err := ReadBackupManifestFromStore(ctx, mem, baseStores[0], encryption) - if err != nil { - return nil, nil, nil, 0, err - } - ownedMemSize += memSize - - incStores := make([]cloud.ExternalStorage, len(fullyResolvedIncrementalsDirectory)) - for i := range fullyResolvedIncrementalsDirectory { - store, err := mkStore(ctx, fullyResolvedIncrementalsDirectory[i], user) - if err != nil { - return nil, nil, nil, 0, errors.Wrapf(err, "failed to open backup storage location") - } - defer store.Close() - incStores[i] = store - } - - var prev []string - if len(incStores) > 0 { - prev, err = backupdest.FindPriorBackups(ctx, incStores[0], backupbase.IncludeManifest) - if err != nil { - return nil, nil, nil, 0, err - } - } - numLayers := len(prev) + 1 - - defaultURIs = make([]string, numLayers) - mainBackupManifests = make([]backuppb.BackupManifest, numLayers) - localityInfo = make([]jobspb.RestoreDetails_BackupLocalityInfo, numLayers) - - // Setup the full backup layer explicitly. - defaultURIs[0] = fullyResolvedBaseDirectory[0] - mainBackupManifests[0] = baseManifest - localityInfo[0], err = getLocalityInfo( - ctx, baseStores, fullyResolvedBaseDirectory, baseManifest, encryption, "", /* prefix */ - ) - if err != nil { - return nil, nil, nil, 0, err - } - - // If we discovered additional layers, handle them too. - if numLayers > 1 { - numPartitions := len(fullyResolvedIncrementalsDirectory) - // We need the parsed base URI (/) for each partition to calculate the - // URI to each layer in that partition below. - baseURIs := make([]*url.URL, numPartitions) - for i := range fullyResolvedIncrementalsDirectory { - baseURIs[i], err = url.Parse(fullyResolvedIncrementalsDirectory[i]) - if err != nil { - return nil, nil, nil, 0, err - } - } - - // For each layer, we need to load the default manifest then calculate the URI and the - // locality info for each partition. - for i := range prev { - defaultManifestForLayer, memSize, err := readBackupManifest(ctx, mem, incStores[0], prev[i], encryption) - if err != nil { - return nil, nil, nil, 0, err - } - ownedMemSize += memSize - mainBackupManifests[i+1] = defaultManifestForLayer - - // prev[i] is the path to the manifest file itself for layer i -- the - // dirname piece of that path is the subdirectory in each of the - // partitions in which we'll also expect to find a partition manifest. - // Recall full inc URI is // - incSubDir := path.Dir(prev[i]) - partitionURIs := make([]string, numPartitions) - for j := range baseURIs { - u := *baseURIs[j] // NB: makes a copy to avoid mutating the baseURI. - u.Path = backuputils.JoinURLPath(u.Path, incSubDir) - partitionURIs[j] = u.String() - } - defaultURIs[i+1] = partitionURIs[0] - localityInfo[i+1], err = getLocalityInfo(ctx, incStores, partitionURIs, defaultManifestForLayer, encryption, incSubDir) - if err != nil { - return nil, nil, nil, 0, err - } - } - } - - totalMemSize := ownedMemSize - ownedMemSize = 0 - - validatedDefaultURIs, validatedMainBackupManifests, validatedLocalityInfo, err := validateEndTimeAndTruncate( - defaultURIs, mainBackupManifests, localityInfo, endTime) - - if err != nil { - return nil, nil, nil, 0, err - } - return validatedDefaultURIs, validatedMainBackupManifests, validatedLocalityInfo, totalMemSize, nil -} - -func validateEndTimeAndTruncate( +// ValidateEndTimeAndTruncate checks that the requested target time, if +// specified, is valid for the list of incremental backups resolved, truncating +// the results to the backup that contains the target time. +// The method also performs additional sanity checks to ensure the backups cover +// the requested time. +func ValidateEndTimeAndTruncate( defaultURIs []string, mainBackupManifests []backuppb.BackupManifest, localityInfo []jobspb.RestoreDetails_BackupLocalityInfo, endTime hlc.Timestamp, ) ([]string, []backuppb.BackupManifest, []jobspb.RestoreDetails_BackupLocalityInfo, error) { - // Check that the requested target time, if specified, is valid for the list - // of incremental backups resolved, truncating the results to the backup that - // contains the target time. if endTime.IsEmpty() { return defaultURIs, mainBackupManifests, localityInfo, nil } @@ -863,16 +706,15 @@ func validateEndTimeAndTruncate( // Ensure that the backup actually has revision history. if !endTime.Equal(b.EndTime) { if b.MVCCFilter != backuppb.MVCCFilter_All { - const errPrefix = "invalid RESTORE timestamp: restoring to arbitrary time requires that BACKUP for requested time be created with '%s' option." + const errPrefix = "invalid RESTORE timestamp: restoring to arbitrary time requires that BACKUP for requested time be created with 'revision_history' option." if i == 0 { return nil, nil, nil, errors.Errorf( - errPrefix+" nearest backup time is %s", backupOptRevisionHistory, + errPrefix+" nearest backup time is %s", timeutil.Unix(0, b.EndTime.WallTime).UTC(), ) } return nil, nil, nil, errors.Errorf( errPrefix+" nearest BACKUP times are %s or %s", - backupOptRevisionHistory, timeutil.Unix(0, mainBackupManifests[i-1].EndTime.WallTime).UTC(), timeutil.Unix(0, b.EndTime.WallTime).UTC(), ) @@ -898,9 +740,9 @@ func validateEndTimeAndTruncate( ) } -// TODO(anzoteh96): benchmark the performance of different search algorithms, -// e.g. linear search, binary search, reverse linear search. -func getBackupIndexAtTime( +// GetBackupIndexAtTime returns the index of the latest backup in +// `backupManifests` with a StartTime >= asOf. +func GetBackupIndexAtTime( backupManifests []backuppb.BackupManifest, asOf hlc.Timestamp, ) (int, error) { if len(backupManifests) == 0 { @@ -919,7 +761,9 @@ func getBackupIndexAtTime( return backupManifestIndex, nil } -func loadSQLDescsFromBackupsAtTime( +// LoadSQLDescsFromBackupsAtTime returns the Descriptors found in the last +// (latest) backup with a StartTime >= asOf. +func LoadSQLDescsFromBackupsAtTime( backupManifests []backuppb.BackupManifest, asOf hlc.Timestamp, ) ([]catalog.Descriptor, backuppb.BackupManifest) { lastBackupManifest := backupManifests[len(backupManifests)-1] @@ -989,9 +833,9 @@ func loadSQLDescsFromBackupsAtTime( return allDescs, lastBackupManifest } -// sanitizeLocalityKV returns a sanitized version of the input string where all +// SanitizeLocalityKV returns a sanitized version of the input string where all // characters that are not alphanumeric or -, =, or _ are replaced with _. -func sanitizeLocalityKV(kv string) string { +func SanitizeLocalityKV(kv string) string { sanitizedKV := make([]byte, len(kv)) for i := 0; i < len(kv); i++ { if (kv[i] >= 'a' && kv[i] <= 'z') || @@ -1005,11 +849,11 @@ func sanitizeLocalityKV(kv string) string { return string(sanitizedKV) } -// checkForPreviousBackup ensures that the target location does not already +// CheckForPreviousBackup ensures that the target location does not already // contain a BACKUP or checkpoint, locking out accidental concurrent operations // on that location. Note that the checkpoint file should be written as soon as // the job actually starts. -func checkForPreviousBackup( +func CheckForPreviousBackup( ctx context.Context, exportStore cloud.ExternalStorage, defaultURI string, ) error { redactedURI := backuputils.RedactURIForErrorMessage(defaultURI) @@ -1027,26 +871,137 @@ func checkForPreviousBackup( redactedURI, backupbase.BackupManifestName) } - r, err = readLatestCheckpointFile(ctx, exportStore, backupManifestCheckpointName) + r, err = readLatestCheckpointFile(ctx, exportStore, BackupManifestCheckpointName) if err == nil { r.Close(ctx) return pgerror.Newf(pgcode.FileAlreadyExists, "%s already contains a %s file (is another operation already in progress?)", - redactedURI, backupManifestCheckpointName) + redactedURI, BackupManifestCheckpointName) } if !errors.Is(err, cloud.ErrFileDoesNotExist) { return errors.Wrapf(err, "%s returned an unexpected error when checking for the existence of %s file", - redactedURI, backupManifestCheckpointName) + redactedURI, BackupManifestCheckpointName) } return nil } -// tempCheckpointFileNameForJob returns temporary filename for backup manifest checkpoint. -func tempCheckpointFileNameForJob(jobID jobspb.JobID) string { - return fmt.Sprintf("%s-%d", backupManifestCheckpointName, jobID) +// TempCheckpointFileNameForJob returns temporary filename for backup manifest checkpoint. +func TempCheckpointFileNameForJob(jobID jobspb.JobID) string { + return fmt.Sprintf("%s-%d", BackupManifestCheckpointName, jobID) +} + +// WriteBackupManifestCheckpoint writes a new BACKUP-CHECKPOINT MANIFEST and +// CHECKSUM file. If it is a pure v22.1 cluster or later, it will write a +// timestamped BACKUP-CHECKPOINT to the /progress directory. If it is a mixed +// cluster version, it will write a non timestamped BACKUP-CHECKPOINT to the +// base directory in order to not break backup jobs that resume on a v21.2 node. +func WriteBackupManifestCheckpoint( + ctx context.Context, + storageURI string, + encryption *jobspb.BackupEncryptionOptions, + desc *backuppb.BackupManifest, + execCfg *sql.ExecutorConfig, + user username.SQLUsername, +) error { + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, storageURI, user) + if err != nil { + return err + } + defer defaultStore.Close() + + sort.Sort(BackupFileDescriptors(desc.Files)) + + descBuf, err := protoutil.Marshal(desc) + if err != nil { + return err + } + + descBuf, err = compressData(descBuf) + if err != nil { + return errors.Wrap(err, "compressing backup manifest") + } + + if encryption != nil { + encryptionKey, err := backupencryption.GetEncryptionKey(ctx, encryption, execCfg.Settings, defaultStore.ExternalIOConf()) + if err != nil { + return err + } + descBuf, err = storageccl.EncryptFile(descBuf, encryptionKey) + if err != nil { + return err + } + } + + // If the cluster is still running on a mixed version, we want to write + // to the base directory instead of the progress directory. That way if + // an old node resumes a backup, it doesn't have to start over. + if !execCfg.Settings.Version.IsActive(ctx, clusterversion.BackupDoesNotOverwriteLatestAndCheckpoint) { + // We want to overwrite the latest checkpoint in the base directory, + // just write to the non versioned BACKUP-CHECKPOINT file. + err = cloud.WriteFile(ctx, defaultStore, BackupManifestCheckpointName, bytes.NewReader(descBuf)) + if err != nil { + return err + } + + checksum, err := GetChecksum(descBuf) + if err != nil { + return err + } + + return cloud.WriteFile(ctx, defaultStore, BackupManifestCheckpointName+ + BackupManifestChecksumSuffix, bytes.NewReader(checksum)) + } + + // We timestamp the checkpoint files in order to enforce write once backups. + // When the job goes to read these timestamped files, it will List + // the checkpoints and pick the file whose name is lexicographically + // sorted to the top. This will be the last checkpoint we write, for + // details refer to newTimestampedCheckpointFileName. + filename := NewTimestampedCheckpointFileName() + + // HTTP storage does not support listing and so we cannot rely on the + // above-mentioned List method to return us the latest checkpoint file. + // Instead, we will write a checkpoint once with a well-known filename, + // and teach the job to always reach for that filename in the face of + // a resume. We may lose progress, but this is a cost we are willing + // to pay to uphold write-once semantics. + if defaultStore.Conf().Provider == roachpb.ExternalStorageProvider_http { + // TODO (darryl): We should do this only for file not found or directory + // does not exist errors. As of right now we only specifically wrap + // ReadFile errors for file not found so this is not possible yet. + if r, err := defaultStore.ReadFile(ctx, BackupProgressDirectory+"/"+BackupManifestCheckpointName); err != nil { + // Since we did not find the checkpoint file this is the first time + // we are going to write a checkpoint, so write it with the well + // known filename. + filename = BackupManifestCheckpointName + } else { + err = r.Close(ctx) + if err != nil { + return err + } + } + } + + err = cloud.WriteFile(ctx, defaultStore, BackupProgressDirectory+"/"+filename, bytes.NewReader(descBuf)) + if err != nil { + return errors.Wrap(err, "calculating checksum") + } + + // Write the checksum file after we've successfully wrote the checkpoint. + checksum, err := GetChecksum(descBuf) + if err != nil { + return errors.Wrap(err, "calculating checksum") + } + + err = cloud.WriteFile(ctx, defaultStore, BackupProgressDirectory+"/"+filename+BackupManifestChecksumSuffix, bytes.NewReader(checksum)) + if err != nil { + return err + } + + return nil } // readLatestCheckpointFile returns an ioctx.ReaderCloserCtx of the latest @@ -1065,12 +1020,12 @@ func readLatestCheckpointFile( // We name files such that the most recent checkpoint will always // be at the top, so just grab the first filename. - err = exportStore.List(ctx, backupProgressDirectory, "", func(p string) error { + err = exportStore.List(ctx, BackupProgressDirectory, "", func(p string) error { // The first file returned by List could be either the checkpoint or // checksum file, but we are only concerned with the timestamped prefix. // We resolve if it is a checkpoint or checksum file separately below. p = strings.TrimPrefix(p, "/") - checkpoint = strings.TrimSuffix(p, backupManifestChecksumSuffix) + checkpoint = strings.TrimSuffix(p, BackupManifestChecksumSuffix) checkpointFound = true // We only want the first checkpoint so return an error that it is // done listing. @@ -1081,7 +1036,7 @@ func readLatestCheckpointFile( // directly. This can still fail if it is a mixed cluster and the // checkpoint was written in the base directory. if errors.Is(err, cloud.ErrListingUnsupported) { - r, err = exportStore.ReadFile(ctx, backupProgressDirectory+"/"+filename) + r, err = exportStore.ReadFile(ctx, BackupProgressDirectory+"/"+filename) // If we found the checkpoint in progress, then don't bother reading // from base, just return the reader. if err == nil { @@ -1092,10 +1047,10 @@ func readLatestCheckpointFile( } if checkpointFound { - if strings.HasSuffix(filename, backupManifestChecksumSuffix) { - return exportStore.ReadFile(ctx, backupProgressDirectory+"/"+checkpoint+backupManifestChecksumSuffix) + if strings.HasSuffix(filename, BackupManifestChecksumSuffix) { + return exportStore.ReadFile(ctx, BackupProgressDirectory+"/"+checkpoint+BackupManifestChecksumSuffix) } - return exportStore.ReadFile(ctx, backupProgressDirectory+"/"+checkpoint) + return exportStore.ReadFile(ctx, BackupProgressDirectory+"/"+checkpoint) } // If the checkpoint wasn't found in the progress directory, then try @@ -1109,22 +1064,22 @@ func readLatestCheckpointFile( } -// newTimestampedCheckpointFileName returns a string of a new checkpoint filename +// NewTimestampedCheckpointFileName returns a string of a new checkpoint filename // with a suffixed version. It returns it in the format of BACKUP-CHECKPOINT- // where version is a hex encoded one's complement of the timestamp. // This means that as long as the supplied timestamp is correct, the filenames // will adhere to a lexicographical/utf-8 ordering such that the most // recent file is at the top. -func newTimestampedCheckpointFileName() string { +func NewTimestampedCheckpointFileName() string { var buffer []byte buffer = encoding.EncodeStringDescending(buffer, timeutil.Now().String()) - return fmt.Sprintf("%s-%s", backupManifestCheckpointName, hex.EncodeToString(buffer)) + return fmt.Sprintf("%s-%s", BackupManifestCheckpointName, hex.EncodeToString(buffer)) } // FetchPreviousBackups takes a list of URIs of previous backups and returns // their manifest as well as the encryption options of the first backup in the // chain. -func fetchPreviousBackups( +func FetchPreviousBackups( ctx context.Context, mem *mon.BoundAccount, user username.SQLUsername, diff --git a/pkg/ccl/backupccl/backuputils/testutils.go b/pkg/ccl/backupccl/backuputils/testutils.go index 60bce41cbd88..e52908d31862 100644 --- a/pkg/ccl/backupccl/backuputils/testutils.go +++ b/pkg/ccl/backupccl/backuputils/testutils.go @@ -24,6 +24,8 @@ import ( ) const ( + // SingleNode is the size of a single node test cluster. + SingleNode = 1 // MultiNode is the size of a multi node test cluster. MultiNode = 3 ) @@ -100,3 +102,11 @@ func backupRestoreTestSetupWithParams( return tc, sqlDB, dir, cleanupFn } + +// BackupRestoreTestSetup creates and returns a pre-populated testing +// environment that can be used in backup and restore tests. +func BackupRestoreTestSetup( + t testing.TB, clusterSize int, numAccounts int, init func(*testcluster.TestCluster), +) (tc *testcluster.TestCluster, sqlDB *sqlutils.SQLRunner, tempDir string, cleanup func()) { + return backupRestoreTestSetupWithParams(t, clusterSize, numAccounts, init, base.TestClusterArgs{}) +} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index f76461985da1..75edd655d690 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -412,13 +413,13 @@ func loadBackupSQLDescs( details jobspb.RestoreDetails, encryption *jobspb.BackupEncryptionOptions, ) ([]backuppb.BackupManifest, backuppb.BackupManifest, []catalog.Descriptor, int64, error) { - backupManifests, sz, err := loadBackupManifests(ctx, mem, details.URIs, + backupManifests, sz, err := backupinfo.LoadBackupManifests(ctx, mem, details.URIs, p.User(), p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, encryption) if err != nil { return nil, backuppb.BackupManifest{}, nil, 0, err } - allDescs, latestBackupManifest := loadSQLDescsFromBackupsAtTime(backupManifests, details.EndTime) + allDescs, latestBackupManifest := backupinfo.LoadSQLDescsFromBackupsAtTime(backupManifests, details.EndTime) for _, m := range details.DatabaseModifiers { for _, typ := range m.ExtraTypeDescs { @@ -478,36 +479,6 @@ type restoreResumer struct { } } -// getStatisticsFromBackup retrieves Statistics from backup manifest, -// either through the Statistics field or from the files. -func getStatisticsFromBackup( - ctx context.Context, - exportStore cloud.ExternalStorage, - encryption *jobspb.BackupEncryptionOptions, - backup backuppb.BackupManifest, -) ([]*stats.TableStatisticProto, error) { - // This part deals with pre-20.2 stats format where backup statistics - // are stored as a field in backup manifests instead of in their - // individual files. - if backup.DeprecatedStatistics != nil { - return backup.DeprecatedStatistics, nil - } - tableStatistics := make([]*stats.TableStatisticProto, 0, len(backup.StatisticsFilenames)) - uniqueFileNames := make(map[string]struct{}) - for _, fname := range backup.StatisticsFilenames { - if _, exists := uniqueFileNames[fname]; !exists { - uniqueFileNames[fname] = struct{}{} - myStatsTable, err := readTableStatistics(ctx, exportStore, fname, encryption) - if err != nil { - return tableStatistics, err - } - tableStatistics = append(tableStatistics, myStatsTable.Statistics...) - } - } - - return tableStatistics, nil -} - // remapRelevantStatistics changes the table ID references in the stats // from those they had in the backed up database to what they should be // in the restored database. @@ -1291,7 +1262,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } } - lastBackupIndex, err := getBackupIndexAtTime(backupManifests, details.EndTime) + lastBackupIndex, err := backupinfo.GetBackupIndexAtTime(backupManifests, details.EndTime) if err != nil { return err } @@ -1350,7 +1321,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } } var remappedStats []*stats.TableStatisticProto - backupStats, err := getStatisticsFromBackup(ctx, defaultStore, details.Encryption, + backupStats, err := backupinfo.GetStatisticsFromBackup(ctx, defaultStore, details.Encryption, latestBackupManifest) if err == nil { remappedStats = remapRelevantStatistics(ctx, backupStats, details.DescriptorRewrites, diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 4a637e837e40..c717646a0f24 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -1483,15 +1483,16 @@ func doRestorePlan( if len(from) <= 1 { // Incremental layers are not specified explicitly. They will be searched for automatically. // This could be either INTO-syntax, OR TO-syntax. - defaultURIs, mainBackupManifests, localityInfo, memReserved, err = resolveBackupManifests( + defaultURIs, mainBackupManifests, localityInfo, memReserved, err = backupdest.ResolveBackupManifests( ctx, &mem, baseStores, mkStore, fullyResolvedBaseDirectory, fullyResolvedIncrementalsDirectory, endTime, encryption, p.User(), ) } else { // Incremental layers are specified explicitly. // This implies the old, deprecated TO-syntax. - defaultURIs, mainBackupManifests, localityInfo, memReserved, err = resolveBackupManifestsExplicitIncrementals( - ctx, &mem, mkStore, from, endTime, encryption, p.User()) + defaultURIs, mainBackupManifests, localityInfo, memReserved, err = + backupdest.DeprecatedResolveBackupManifestsExplicitIncrementals(ctx, &mem, mkStore, from, + endTime, encryption, p.User()) } if err != nil { diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 5f259c056dcc..dfe7f6407841 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -11,6 +11,7 @@ package backupccl import ( "sort" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -61,7 +62,7 @@ func makeSimpleImportSpans( } for i := range backups { - sort.Sort(BackupFileDescriptors(backups[i].Files)) + sort.Sort(backupinfo.BackupFileDescriptors(backups[i].Files)) } var cover []execinfrapb.RestoreSpanEntry diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index a7bb5ad60af6..394be2cf50ea 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupdest" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" @@ -163,7 +164,7 @@ func (m metadataSSTInfoReader) showBackup( user username.SQLUsername, resultsCh chan<- tree.Datums, ) error { - filename := metadataSSTName + filename := backupinfo.MetadataSSTName push := func(_, readable string, value json.JSON) error { val := tree.DNull if value != nil { @@ -182,7 +183,7 @@ func (m metadataSSTInfoReader) showBackup( return errors.Wrapf(err, "creating external store") } defer store.Close() - if err := DebugDumpMetadataSST(ctx, store, filename, info.enc, push); err != nil { + if err := backupinfo.DebugDumpMetadataSST(ctx, store, filename, info.enc, push); err != nil { return err } } @@ -216,15 +217,15 @@ func showBackupPlanHook( } expected := map[string]sql.KVStringOptValidate{ - backupOptEncPassphrase: sql.KVStringOptRequireValue, - backupOptEncKMS: sql.KVStringOptRequireValue, - backupOptWithPrivileges: sql.KVStringOptRequireNoValue, - backupOptAsJSON: sql.KVStringOptRequireNoValue, - backupOptWithDebugIDs: sql.KVStringOptRequireNoValue, - backupOptIncStorage: sql.KVStringOptRequireValue, - backupOptDebugMetadataSST: sql.KVStringOptRequireNoValue, - backupOptEncDir: sql.KVStringOptRequireValue, - backupOptCheckFiles: sql.KVStringOptRequireNoValue, + backupencryption.BackupOptEncPassphrase: sql.KVStringOptRequireValue, + backupencryption.BackupOptEncKMS: sql.KVStringOptRequireValue, + backupOptWithPrivileges: sql.KVStringOptRequireNoValue, + backupOptAsJSON: sql.KVStringOptRequireNoValue, + backupOptWithDebugIDs: sql.KVStringOptRequireNoValue, + backupOptIncStorage: sql.KVStringOptRequireValue, + backupOptDebugMetadataSST: sql.KVStringOptRequireNoValue, + backupOptEncDir: sql.KVStringOptRequireValue, + backupOptCheckFiles: sql.KVStringOptRequireNoValue, } optsFn, err := p.TypeAsStringOpts(ctx, backup.Options, expected) if err != nil { @@ -334,7 +335,7 @@ func showBackupPlanHook( var encryption *jobspb.BackupEncryptionOptions showEncErr := `If you are running SHOW BACKUP exclusively on an incremental backup, you must pass the 'encryption_info_dir' parameter that points to the directory of your full backup` - if passphrase, ok := opts[backupOptEncPassphrase]; ok { + if passphrase, ok := opts[backupencryption.BackupOptEncPassphrase]; ok { opts, err := backupencryption.ReadEncryptionOptions(ctx, encStore) if errors.Is(err, backupencryption.ErrEncryptionInfoRead) { return errors.WithHint(err, showEncErr) @@ -347,7 +348,7 @@ you must pass the 'encryption_info_dir' parameter that points to the directory o Mode: jobspb.EncryptionMode_Passphrase, Key: encryptionKey, } - } else if kms, ok := opts[backupOptEncKMS]; ok { + } else if kms, ok := opts[backupencryption.BackupOptEncKMS]; ok { opts, err := backupencryption.ReadEncryptionOptions(ctx, encStore) if errors.Is(err, backupencryption.ErrEncryptionInfoRead) { return errors.WithHint(err, showEncErr) @@ -418,14 +419,14 @@ you must pass the 'encryption_info_dir' parameter that points to the directory o mkStore := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI info.defaultURIs, info.manifests, info.localityInfo, memReserved, - err = resolveBackupManifests( + err = backupdest.ResolveBackupManifests( ctx, &mem, baseStores, mkStore, fullyResolvedDest, fullyResolvedIncrementalsDirectory, hlc.Timestamp{}, encryption, p.User()) defer func() { mem.Shrink(ctx, memReserved) }() if err != nil { - if errors.Is(err, errLocalityDescriptor) && subdir == "" { + if errors.Is(err, backupinfo.ErrLocalityDescriptor) && subdir == "" { p.BufferClientNotice(ctx, pgnotice.Newf("`SHOW BACKUP` using the old syntax ("+ "without the `IN` keyword) on a locality aware backup does not display or validate"+ @@ -518,9 +519,9 @@ func checkBackupFiles( // metadata files ( prefixed with `backupPartitionDescriptorPrefix`) , as // they're validated in resolveBackupManifests. for _, metaFile := range []string{ - fileInfoPath, - metadataSSTName, - backupbase.BackupManifestName + backupManifestChecksumSuffix} { + backupinfo.FileInfoPath, + backupinfo.MetadataSSTName, + backupbase.BackupManifestName + backupinfo.BackupManifestChecksumSuffix} { if _, err := defaultStore.Size(ctx, metaFile); err != nil { return nil, errors.Wrapf(err, "Error checking metadata file %s/%s", info.defaultURIs[layer], metaFile) diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index c7bb1744dea6..917eba421858 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -15,6 +15,7 @@ import ( "sort" "strings" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver" "github.com/cockroachdb/cockroach/pkg/keys" @@ -340,7 +341,7 @@ func selectTargets( asOf hlc.Timestamp, restoreSystemUsers bool, ) ([]catalog.Descriptor, []catalog.DatabaseDescriptor, []descpb.TenantInfoWithUsage, error) { - allDescs, lastBackupManifest := loadSQLDescsFromBackupsAtTime(backupManifests, asOf) + allDescs, lastBackupManifest := backupinfo.LoadSQLDescsFromBackupsAtTime(backupManifests, asOf) if descriptorCoverage == tree.AllDescriptors { return fullClusterTargetsRestore(allDescs, lastBackupManifest) @@ -388,7 +389,7 @@ func selectTargets( return nil, nil, nil, errors.Errorf("no tables or databases matched the given targets: %s", tree.ErrString(&targets)) } - if lastBackupManifest.FormatVersion >= BackupFormatDescriptorTrackingVersion { + if lastBackupManifest.FormatVersion >= backupinfo.BackupFormatDescriptorTrackingVersion { if err := matched.CheckExpansions(lastBackupManifest.CompleteDbs); err != nil { return nil, nil, nil, err } @@ -448,7 +449,7 @@ func MakeBackupTableEntry( backupManifests = backupManifests[:ind+1] } - allDescs, _ := loadSQLDescsFromBackupsAtTime(backupManifests, endTime) + allDescs, _ := backupinfo.LoadSQLDescsFromBackupsAtTime(backupManifests, endTime) resolver, err := backupresolver.NewDescriptorResolver(allDescs) if err != nil { return BackupTableEntry{}, errors.Wrapf(err, "creating a new resolver for all descriptors") diff --git a/pkg/ccl/backupccl/utils_test.go b/pkg/ccl/backupccl/utils_test.go index 9284c15bfa7f..be4a00fc84f4 100644 --- a/pkg/ccl/backupccl/utils_test.go +++ b/pkg/ccl/backupccl/utils_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -382,7 +383,7 @@ func getSpansFromManifest(ctx context.Context, t *testing.T, backupPath string) backupManifestBytes, err := ioutil.ReadFile(backupPath + "/" + backupbase.BackupManifestName) require.NoError(t, err) var backupManifest backuppb.BackupManifest - decompressedBytes, err := decompressData(ctx, nil, backupManifestBytes) + decompressedBytes, err := backupinfo.DecompressData(ctx, nil, backupManifestBytes) require.NoError(t, err) require.NoError(t, protoutil.Unmarshal(decompressedBytes, &backupManifest)) spans := make([]roachpb.Span, 0, len(backupManifest.Files)) diff --git a/pkg/ccl/cliccl/BUILD.bazel b/pkg/ccl/cliccl/BUILD.bazel index d5d060bf2ebc..7246cb6b70ba 100644 --- a/pkg/ccl/cliccl/BUILD.bazel +++ b/pkg/ccl/cliccl/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/ccl/backupccl", "//pkg/ccl/backupccl/backupbase", "//pkg/ccl/backupccl/backupdest", + "//pkg/ccl/backupccl/backupinfo", "//pkg/ccl/backupccl/backuppb", "//pkg/ccl/backupccl/backuputils", "//pkg/ccl/baseccl", diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 64f68a8ace98..b8efe3d15fe8 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupdest" + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" @@ -301,7 +302,7 @@ func getManifestFromURI(ctx context.Context, path string) (backuppb.BackupManife // upgraded from the old FK representation, or even older formats). If more // fields are added to the output, the table descriptors may need to be // upgraded. - backupManifest, _, err := backupccl.ReadBackupManifestFromURI(ctx, nil /* mem */, path, username.RootUserName(), + backupManifest, _, err := backupinfo.ReadBackupManifestFromURI(ctx, nil /* mem */, path, username.RootUserName(), externalStorageFromURIFactory, nil) if err != nil { return backuppb.BackupManifest{}, err @@ -404,7 +405,7 @@ func runListIncrementalCmd(cmd *cobra.Command, args []string) error { } defer baseStore.Close() - oldIncPaths, err := backupdest.FindPriorBackups(ctx, baseStore, backupbase.OmitManifest) + oldIncPaths, err := backupdest.FindPriorBackups(ctx, baseStore, backupdest.OmitManifest) if err != nil { return err } @@ -421,7 +422,7 @@ func runListIncrementalCmd(cmd *cobra.Command, args []string) error { } defer incStore.Close() - newIncPaths, err := backupdest.FindPriorBackups(ctx, incStore, backupbase.OmitManifest) + newIncPaths, err := backupdest.FindPriorBackups(ctx, incStore, backupdest.OmitManifest) if err != nil { return err } @@ -439,7 +440,7 @@ func runListIncrementalCmd(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "connect to external storage") } defer stores[i].Close() - manifest, _, err := backupccl.ReadBackupManifestFromStore(ctx, nil /* mem */, stores[i], nil) + manifest, _, err := backupinfo.ReadBackupManifestFromStore(ctx, nil /* mem */, stores[i], nil) if err != nil { return err }