Skip to content

Commit

Permalink
backupinfo: introduce a backupinfo package
Browse files Browse the repository at this point in the history
The backupinfo package contains logic related to interacting
with information and metadata describing the backup. After this
change we have `backupdest` depending on `backupinfo`.

Release note: None
  • Loading branch information
adityamaru committed Jun 23, 2022
1 parent 558370b commit a743d82
Show file tree
Hide file tree
Showing 24 changed files with 723 additions and 544 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ ALL_TESTS = [
"//pkg/build/starlarkutil:starlarkutil_test",
"//pkg/build/util:util_test",
"//pkg/ccl/backupccl/backupdest:backupdest_test",
"//pkg/ccl/backupccl/backupinfo:backupinfo_disallowed_imports_test",
"//pkg/ccl/backupccl/backupinfo:backupinfo_test",
"//pkg/ccl/backupccl/backupresolver:backupresolver_test",
"//pkg/ccl/backupccl:backupccl_test",
"//pkg/ccl/baseccl:baseccl_test",
Expand Down
7 changes: 2 additions & 5 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go_library(
srcs = [
"alter_backup_planning.go",
"backup_job.go",
"backup_metadata.go",
"backup_planning.go",
"backup_planning_tenant.go",
"backup_processor.go",
Expand All @@ -14,7 +13,6 @@ go_library(
"create_scheduled_backup.go",
"file_sst_sink.go",
"key_rewriter.go",
"manifest_handling.go",
"restoration_data.go",
"restore_data_processor.go",
"restore_job.go",
Expand All @@ -37,6 +35,7 @@ go_library(
"//pkg/ccl/backupccl/backupbase",
"//pkg/ccl/backupccl/backupdest",
"//pkg/ccl/backupccl/backupencryption",
"//pkg/ccl/backupccl/backupinfo",
"//pkg/ccl/backupccl/backuppb",
"//pkg/ccl/backupccl/backupresolver",
"//pkg/ccl/backupccl/backuputils",
Expand Down Expand Up @@ -112,10 +111,8 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/interval",
"//pkg/util/ioctx",
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/log/eventpb",
Expand Down Expand Up @@ -144,7 +141,6 @@ go_test(
"alter_backup_test.go",
"backup_cloud_test.go",
"backup_intents_test.go",
"backup_metadata_test.go",
"backup_planning_test.go",
"backup_rand_test.go",
"backup_tenant_test.go",
Expand Down Expand Up @@ -178,6 +174,7 @@ go_test(
"//pkg/ccl/backupccl/backupbase",
"//pkg/ccl/backupccl/backupdest",
"//pkg/ccl/backupccl/backupencryption",
"//pkg/ccl/backupccl/backupinfo",
"//pkg/ccl/backupccl/backuppb",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
Expand Down
43 changes: 23 additions & 20 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupdest"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuputils"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
Expand Down Expand Up @@ -243,7 +244,7 @@ func backup(
RevisionStartTime: backupManifest.RevisionStartTime,
})

err := writeBackupManifestCheckpoint(
err := backupinfo.WriteBackupManifestCheckpoint(
ctx, defaultURI, encryption, backupManifest, execCtx.ExecCfg(), execCtx.User(),
)
if err != nil {
Expand Down Expand Up @@ -290,8 +291,8 @@ func backup(
// Set a unique filename for each partition backup descriptor. The ID
// ensures uniqueness, and the kv string appended to the end is for
// readability.
filename := fmt.Sprintf("%s_%d_%s",
backupPartitionDescriptorPrefix, nextPartitionedDescFilenameID, sanitizeLocalityKV(kv))
filename := fmt.Sprintf("%s_%d_%s", backupPartitionDescriptorPrefix,
nextPartitionedDescFilenameID, backupinfo.SanitizeLocalityKV(kv))
nextPartitionedDescFilenameID++
backupManifest.PartitionDescriptorFilenames = append(backupManifest.PartitionDescriptorFilenames, filename)
desc := backuppb.BackupPartitionDescriptor{
Expand All @@ -306,15 +307,16 @@ func backup(
return err
}
defer store.Close()
return writeBackupPartitionDescriptor(ctx, store, filename, encryption, &desc)
return backupinfo.WriteBackupPartitionDescriptor(ctx, store, filename, encryption, &desc)
}(); err != nil {
return roachpb.RowCount{}, err
}
}
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup manifest"})
if err := writeBackupManifest(ctx, settings, defaultStore, backupbase.BackupManifestName, encryption, backupManifest); err != nil {
if err := backupinfo.WriteBackupManifest(ctx, settings, defaultStore, backupbase.BackupManifestName,
encryption, backupManifest); err != nil {
return roachpb.RowCount{}, err
}
var tableStatistics []*stats.TableStatisticProto
Expand Down Expand Up @@ -344,12 +346,12 @@ func backup(
}

resumerSpan.RecordStructured(&types.StringValue{Value: "writing backup table statistics"})
if err := writeTableStatistics(ctx, defaultStore, backupStatisticsFileName, encryption, &statsTable); err != nil {
if err := backupinfo.WriteTableStatistics(ctx, defaultStore, encryption, &statsTable); err != nil {
return roachpb.RowCount{}, err
}

if writeMetadataSST.Get(&settings.SV) {
if err := writeBackupMetadataSST(ctx, defaultStore, encryption, backupManifest, tableStatistics); err != nil {
if backupinfo.WriteMetadataSST.Get(&settings.SV) {
if err := backupinfo.WriteBackupMetadataSST(ctx, defaultStore, encryption, backupManifest, tableStatistics); err != nil {
err = errors.Wrap(err, "writing forward-compat metadata sst")
if !build.IsRelease() {
return roachpb.RowCount{}, err
Expand Down Expand Up @@ -448,7 +450,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

if err := writeBackupManifestCheckpoint(
if err := backupinfo.WriteBackupManifestCheckpoint(
ctx, details.URI, details.EncryptionOptions, backupManifest, p.ExecCfg(), p.User(),
); err != nil {
return err
Expand Down Expand Up @@ -730,20 +732,21 @@ func (b *backupResumer) readManifestOnResume(
// they could be using either the new or the old foreign key
// representations. We should just preserve whatever representation the
// table descriptors were using and leave them alone.
desc, memSize, err := readBackupCheckpointManifest(ctx, mem, defaultStore, backupManifestCheckpointName,
details.EncryptionOptions)
desc, memSize, err := backupinfo.ReadBackupCheckpointManifest(ctx, mem, defaultStore,
backupinfo.BackupManifestCheckpointName, details.EncryptionOptions)
if err != nil {
if !errors.Is(err, cloud.ErrFileDoesNotExist) {
return nil, 0, errors.Wrapf(err, "reading backup checkpoint")
}
// Try reading temp checkpoint.
tmpCheckpoint := tempCheckpointFileNameForJob(b.job.ID())
desc, memSize, err = readBackupCheckpointManifest(ctx, mem, defaultStore, tmpCheckpoint, details.EncryptionOptions)
tmpCheckpoint := backupinfo.TempCheckpointFileNameForJob(b.job.ID())
desc, memSize, err = backupinfo.ReadBackupCheckpointManifest(ctx, mem, defaultStore,
tmpCheckpoint, details.EncryptionOptions)
if err != nil {
return nil, 0, err
}
// "Rename" temp checkpoint.
if err := writeBackupManifestCheckpoint(
if err := backupinfo.WriteBackupManifestCheckpoint(
ctx, details.URI, details.EncryptionOptions, &desc, cfg, user,
); err != nil {
mem.Shrink(ctx, memSize)
Expand All @@ -753,8 +756,8 @@ func (b *backupResumer) readManifestOnResume(
if err := defaultStore.Delete(ctx, tmpCheckpoint); err != nil {
log.Errorf(ctx, "error removing temporary checkpoint %s", tmpCheckpoint)
}
if err := defaultStore.Delete(ctx, backupProgressDirectory+"/"+tmpCheckpoint); err != nil {
log.Errorf(ctx, "error removing temporary checkpoint %s", backupProgressDirectory+"/"+tmpCheckpoint)
if err := defaultStore.Delete(ctx, backupinfo.BackupProgressDirectory+"/"+tmpCheckpoint); err != nil {
log.Errorf(ctx, "error removing temporary checkpoint %s", backupinfo.BackupProgressDirectory+"/"+tmpCheckpoint)
}
}

Expand Down Expand Up @@ -850,18 +853,18 @@ func (b *backupResumer) deleteCheckpoint(
defer exportStore.Close()
// We first attempt to delete from base directory to account for older
// backups, and then from the progress directory.
err = exportStore.Delete(ctx, backupManifestCheckpointName)
err = exportStore.Delete(ctx, backupinfo.BackupManifestCheckpointName)
if err != nil {
log.Warningf(ctx, "unable to delete checkpointed backup descriptor file in base directory: %+v", err)
}
err = exportStore.Delete(ctx, backupManifestCheckpointName+backupManifestChecksumSuffix)
err = exportStore.Delete(ctx, backupinfo.BackupManifestCheckpointName+backupinfo.BackupManifestChecksumSuffix)
if err != nil {
log.Warningf(ctx, "unable to delete checkpoint checksum file in base directory: %+v", err)
}
// Delete will not delete a nonempty directory, so we have to go through
// all files and delete each file one by one.
return exportStore.List(ctx, backupProgressDirectory, "", func(p string) error {
return exportStore.Delete(ctx, backupProgressDirectory+p)
return exportStore.List(ctx, backupinfo.BackupProgressDirectory, "", func(p string) error {
return exportStore.Delete(ctx, backupinfo.BackupProgressDirectory+p)
})
}(); err != nil {
log.Warningf(ctx, "unable to delete checkpointed backup descriptor file in progress directory: %+v", err)
Expand Down
Loading

0 comments on commit a743d82

Please sign in to comment.