diff --git a/AUTHORS b/AUTHORS index df99a2be75ac..6f1c26fc9394 100644 --- a/AUTHORS +++ b/AUTHORS @@ -168,6 +168,7 @@ Evan Wall Evgeniy Vasilev fabio Faizan Qazi +Faizaan Madhani fangwens Francis Bergin Fenil Patel diff --git a/build/teamcity/cockroach/nightlies/sqllogic_corpus_nightly.sh b/build/teamcity/cockroach/nightlies/sqllogic_corpus_nightly.sh index fcb3da541d8c..b3a8d84b5082 100755 --- a/build/teamcity/cockroach/nightlies/sqllogic_corpus_nightly.sh +++ b/build/teamcity/cockroach/nightlies/sqllogic_corpus_nightly.sh @@ -8,6 +8,6 @@ source "$dir/teamcity-support.sh" # For $root source "$dir/teamcity-bazel-support.sh" # For run_bazel tc_start_block "Run SQL Logic Test with Declarative Corpus Generation" -BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e TC_BUILD_BRANCH -e GITHUB_API_TOKEN -e GOOGLE_EPHEMERAL_CREDENTIALS -e BUILD_VCS_NUMBER -e TC_BUILD_ID -e TC_SERVER_URL" \ +BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e TC_BUILD_BRANCH -e GITHUB_API_TOKEN -e GOOGLE_EPHEMERAL_CREDENTIALS -e BUILD_VCS_NUMBER -e TC_BUILD_ID -e TC_SERVER_URL -e TC_BUILDTYPE_ID -e GITHUB_REPO" \ run_bazel build/teamcity/cockroach/nightlies/sqllogic_corpus_nightly_impl.sh tc_end_block "Run SQL Logic Test High VModule" diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 5dca580f8a0b..a21df616444b 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -13,6 +13,8 @@ import ( "fmt" "net/url" "path" + "reflect" + "sort" "strings" "time" @@ -790,6 +792,122 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree } } +func getBackupDetailAndManifest( + ctx context.Context, + execCfg *sql.ExecutorConfig, + txn *kv.Txn, + initialDetails jobspb.BackupDetails, + user username.SQLUsername, + backupDestination backupdest.ResolvedDestination, +) (jobspb.BackupDetails, backuppb.BackupManifest, error) { + makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI + + kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig, + execCfg.DB, user, execCfg.InternalExecutor) + + mem := execCfg.RootMemoryMonitor.MakeBoundAccount() + defer mem.Close(ctx) + + prevBackups, encryptionOptions, memSize, err := backupinfo.FetchPreviousBackups(ctx, &mem, user, + makeCloudStorage, backupDestination.PrevBackupURIs, *initialDetails.EncryptionOptions, &kmsEnv) + + if err != nil { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err + } + defer func() { + mem.Shrink(ctx, memSize) + }() + + if len(prevBackups) > 0 { + baseManifest := prevBackups[0] + if baseManifest.DescriptorCoverage == tree.AllDescriptors && + !initialDetails.FullCluster { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, errors.Errorf("cannot append a backup of specific tables or databases to a cluster backup") + } + + if err := requireEnterprise(execCfg, "incremental"); err != nil { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err + } + lastEndTime := prevBackups[len(prevBackups)-1].EndTime + if lastEndTime.Compare(initialDetails.EndTime) > 0 { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, + errors.Newf("`AS OF SYSTEM TIME` %s must be greater than "+ + "the previous backup's end time of %s.", + initialDetails.EndTime.GoTime(), lastEndTime.GoTime()) + } + } + + localityKVs := make([]string, len(backupDestination.URIsByLocalityKV)) + i := 0 + for k := range backupDestination.URIsByLocalityKV { + localityKVs[i] = k + i++ + } + + for i := range prevBackups { + prevBackup := prevBackups[i] + // IDs are how we identify tables, and those are only meaningful in the + // context of their own cluster, so we need to ensure we only allow + // incremental previous backups that we created. + if fromCluster := prevBackup.ClusterID; !fromCluster.Equal(execCfg.NodeInfo.LogicalClusterID()) { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, errors.Newf("previous BACKUP belongs to cluster %s", fromCluster.String()) + } + + prevLocalityKVs := prevBackup.LocalityKVs + + // Checks that each layer in the backup uses the same localities + // Does NOT check that each locality/layer combination is actually at the + // expected locations. + // This is complex right now, but should be easier shortly. + // TODO(benbardin): Support verifying actual existence of localities for + // each layer after deprecating TO-syntax in 22.2 + sort.Strings(localityKVs) + sort.Strings(prevLocalityKVs) + if !(len(localityKVs) == 0 && len(prevLocalityKVs) == 0) && !reflect.DeepEqual(localityKVs, + prevLocalityKVs) { + // Note that this won't verify the default locality. That's not + // necessary, because the default locality defines the backup manifest + // location. If that URI isn't right, the backup chain will fail to + // load. + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, errors.Newf( + "Requested backup has localities %s, but a previous backup layer in this collection has localities %s. "+ + "Mismatched backup layers are not supported. Please take a new full backup with the new localities, or an "+ + "incremental backup with matching localities.", + localityKVs, prevLocalityKVs, + ) + } + } + + // updatedDetails and backupManifest should be treated as read-only after + // they're returned from their respective functions. Future changes to those + // objects should be made within those functions. + updatedDetails, err := updateBackupDetails( + ctx, + initialDetails, + backupDestination.CollectionURI, + backupDestination.DefaultURI, + backupDestination.ChosenSubdir, + backupDestination.URIsByLocalityKV, + prevBackups, + encryptionOptions, + &kmsEnv) + if err != nil { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err + } + + backupManifest, err := createBackupManifest( + ctx, + execCfg, + txn, + updatedDetails, + prevBackups) + if err != nil { + return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err + } + + return updatedDetails, backupManifest, nil +} + func (b *backupResumer) readManifestOnResume( ctx context.Context, mem *mon.BoundAccount, diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 1ef488490389..c1c0b84bc8e2 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -11,15 +11,12 @@ package backupccl import ( "context" "fmt" - "reflect" - "sort" "strconv" "strings" "time" "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupbase" - "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupdest" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupencryption" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupinfo" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" @@ -37,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" - "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" @@ -1217,122 +1213,6 @@ func checkForNewTables( return nil } -func getBackupDetailAndManifest( - ctx context.Context, - execCfg *sql.ExecutorConfig, - txn *kv.Txn, - initialDetails jobspb.BackupDetails, - user username.SQLUsername, - backupDestination backupdest.ResolvedDestination, -) (jobspb.BackupDetails, backuppb.BackupManifest, error) { - makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI - - kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig, - execCfg.DB, user, execCfg.InternalExecutor) - - mem := execCfg.RootMemoryMonitor.MakeBoundAccount() - defer mem.Close(ctx) - - prevBackups, encryptionOptions, memSize, err := backupinfo.FetchPreviousBackups(ctx, &mem, user, - makeCloudStorage, backupDestination.PrevBackupURIs, *initialDetails.EncryptionOptions, &kmsEnv) - - if err != nil { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err - } - defer func() { - mem.Shrink(ctx, memSize) - }() - - if len(prevBackups) > 0 { - baseManifest := prevBackups[0] - if baseManifest.DescriptorCoverage == tree.AllDescriptors && - !initialDetails.FullCluster { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, errors.Errorf("cannot append a backup of specific tables or databases to a cluster backup") - } - - if err := requireEnterprise(execCfg, "incremental"); err != nil { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err - } - lastEndTime := prevBackups[len(prevBackups)-1].EndTime - if lastEndTime.Compare(initialDetails.EndTime) > 0 { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, - errors.Newf("`AS OF SYSTEM TIME` %s must be greater than "+ - "the previous backup's end time of %s.", - initialDetails.EndTime.GoTime(), lastEndTime.GoTime()) - } - } - - localityKVs := make([]string, len(backupDestination.URIsByLocalityKV)) - i := 0 - for k := range backupDestination.URIsByLocalityKV { - localityKVs[i] = k - i++ - } - - for i := range prevBackups { - prevBackup := prevBackups[i] - // IDs are how we identify tables, and those are only meaningful in the - // context of their own cluster, so we need to ensure we only allow - // incremental previous backups that we created. - if fromCluster := prevBackup.ClusterID; !fromCluster.Equal(execCfg.NodeInfo.LogicalClusterID()) { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, errors.Newf("previous BACKUP belongs to cluster %s", fromCluster.String()) - } - - prevLocalityKVs := prevBackup.LocalityKVs - - // Checks that each layer in the backup uses the same localities - // Does NOT check that each locality/layer combination is actually at the - // expected locations. - // This is complex right now, but should be easier shortly. - // TODO(benbardin): Support verifying actual existence of localities for - // each layer after deprecating TO-syntax in 22.2 - sort.Strings(localityKVs) - sort.Strings(prevLocalityKVs) - if !(len(localityKVs) == 0 && len(prevLocalityKVs) == 0) && !reflect.DeepEqual(localityKVs, - prevLocalityKVs) { - // Note that this won't verify the default locality. That's not - // necessary, because the default locality defines the backup manifest - // location. If that URI isn't right, the backup chain will fail to - // load. - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, errors.Newf( - "Requested backup has localities %s, but a previous backup layer in this collection has localities %s. "+ - "Mismatched backup layers are not supported. Please take a new full backup with the new localities, or an "+ - "incremental backup with matching localities.", - localityKVs, prevLocalityKVs, - ) - } - } - - // updatedDetails and backupManifest should be treated as read-only after - // they're returned from their respective functions. Future changes to those - // objects should be made within those functions. - updatedDetails, err := updateBackupDetails( - ctx, - initialDetails, - backupDestination.CollectionURI, - backupDestination.DefaultURI, - backupDestination.ChosenSubdir, - backupDestination.URIsByLocalityKV, - prevBackups, - encryptionOptions, - &kmsEnv) - if err != nil { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err - } - - backupManifest, err := createBackupManifest( - ctx, - execCfg, - txn, - updatedDetails, - prevBackups) - if err != nil { - return jobspb.BackupDetails{}, backuppb.BackupManifest{}, err - } - - return updatedDetails, backupManifest, nil -} - func getTenantInfo( ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, jobDetails jobspb.BackupDetails, ) ([]roachpb.Span, []descpb.TenantInfoWithUsage, error) { diff --git a/pkg/ccl/backupccl/backupdest/BUILD.bazel b/pkg/ccl/backupccl/backupdest/BUILD.bazel index d049e0a7b081..315cb4920339 100644 --- a/pkg/ccl/backupccl/backupdest/BUILD.bazel +++ b/pkg/ccl/backupccl/backupdest/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//pkg/util/ioctx", "//pkg/util/mon", "//pkg/util/timeutil", + "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/ccl/backupccl/backupdest/backup_destination.go b/pkg/ccl/backupccl/backupdest/backup_destination.go index 8616f9dcb1b9..7e4b43c70d4a 100644 --- a/pkg/ccl/backupccl/backupdest/backup_destination.go +++ b/pkg/ccl/backupccl/backupdest/backup_destination.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -502,6 +503,7 @@ func ResolveBackupManifests( ctx context.Context, mem *mon.BoundAccount, baseStores []cloud.ExternalStorage, + incStores []cloud.ExternalStorage, mkStore cloud.ExternalStorageFromURIFactory, fullyResolvedBaseDirectory []string, fullyResolvedIncrementalsDirectory []string, @@ -517,6 +519,9 @@ func ResolveBackupManifests( reservedMemSize int64, _ error, ) { + ctx, sp := tracing.ChildSpan(ctx, "backupdest.ResolveBackupManifests") + defer sp.Finish() + var ownedMemSize int64 defer func() { if ownedMemSize != 0 { @@ -530,16 +535,6 @@ func ResolveBackupManifests( } ownedMemSize += memSize - incStores := make([]cloud.ExternalStorage, len(fullyResolvedIncrementalsDirectory)) - for i := range fullyResolvedIncrementalsDirectory { - store, err := mkStore(ctx, fullyResolvedIncrementalsDirectory[i], user) - if err != nil { - return nil, nil, nil, 0, errors.Wrapf(err, "failed to open backup storage location") - } - defer store.Close() - incStores[i] = store - } - var prev []string if len(incStores) > 0 { prev, err = FindPriorBackups(ctx, incStores[0], includeManifest) @@ -566,8 +561,8 @@ func ResolveBackupManifests( // If we discovered additional layers, handle them too. if numLayers > 1 { numPartitions := len(fullyResolvedIncrementalsDirectory) - // We need the parsed base URI (/) for each partition to calculate the - // URI to each layer in that partition below. + // We need the parsed base URI (/) for each partition to + // calculate the URI to each layer in that partition below. baseURIs := make([]*url.URL, numPartitions) for i := range fullyResolvedIncrementalsDirectory { baseURIs[i], err = url.Parse(fullyResolvedIncrementalsDirectory[i]) @@ -576,31 +571,52 @@ func ResolveBackupManifests( } } - // For each layer, we need to load the default manifest then calculate the URI and the - // locality info for each partition. + // For each backup layer we construct the default URI. We don't load the + // manifests in this loop since we want to do that concurrently. for i := range prev { - defaultManifestForLayer, memSize, err := backupinfo.ReadBackupManifest(ctx, mem, - incStores[0], prev[i], encryption, kmsEnv) - if err != nil { - return nil, nil, nil, 0, err - } - ownedMemSize += memSize - mainBackupManifests[i+1] = defaultManifestForLayer - // prev[i] is the path to the manifest file itself for layer i -- the // dirname piece of that path is the subdirectory in each of the // partitions in which we'll also expect to find a partition manifest. // Recall full inc URI is // incSubDir := path.Dir(prev[i]) + u := *baseURIs[0] // NB: makes a copy to avoid mutating the baseURI. + u.Path = backuputils.JoinURLPath(u.Path, incSubDir) + defaultURIs[i+1] = u.String() + } + + // Load the default backup manifests for each backup layer, this is done + // concurrently. + enc := jobspb.BackupEncryptionOptions{ + Mode: jobspb.EncryptionMode_None, + } + if encryption != nil { + enc = *encryption + } + defaultManifestsForEachLayer, _, memSize, err := backupinfo.FetchPreviousBackups(ctx, mem, user, + mkStore, defaultURIs[1:], enc, kmsEnv) + if err != nil { + return nil, nil, nil, 0, err + } + ownedMemSize += memSize + + // Iterate over the layers one last time to memoize the loaded manifests and + // read the locality info. + // + // TODO(adityamaru): Parallelize the loading of the locality descriptors. + for i := range prev { + // The manifest for incremental layer i slots in at i+1 since the full + // backup manifest occupies index 0 in `mainBackupManifests`. + mainBackupManifests[i+1] = defaultManifestsForEachLayer[i] + incSubDir := path.Dir(prev[i]) partitionURIs := make([]string, numPartitions) for j := range baseURIs { u := *baseURIs[j] // NB: makes a copy to avoid mutating the baseURI. u.Path = backuputils.JoinURLPath(u.Path, incSubDir) partitionURIs[j] = u.String() } - defaultURIs[i+1] = partitionURIs[0] + localityInfo[i+1], err = backupinfo.GetLocalityInfo(ctx, incStores, partitionURIs, - defaultManifestForLayer, encryption, kmsEnv, incSubDir) + defaultManifestsForEachLayer[i], encryption, kmsEnv, incSubDir) if err != nil { return nil, nil, nil, 0, err } diff --git a/pkg/ccl/backupccl/backupdest/incrementals.go b/pkg/ccl/backupccl/backupdest/incrementals.go index 3431cfb68222..38438fc0fb0a 100644 --- a/pkg/ccl/backupccl/backupdest/incrementals.go +++ b/pkg/ccl/backupccl/backupdest/incrementals.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -60,6 +61,9 @@ func CollectionAndSubdir(path string, subdir string) (string, string) { func FindPriorBackups( ctx context.Context, store cloud.ExternalStorage, includeManifest bool, ) ([]string, error) { + ctx, sp := tracing.ChildSpan(ctx, "backupdest.FindPriorBackups") + defer sp.Finish() + var prev []string if err := store.List(ctx, "", listingDelimDataSlash, func(p string) error { if ok, err := path.Match(incBackupSubdirGlob+backupbase.BackupManifestName, p); err != nil { @@ -92,6 +96,9 @@ func FindPriorBackups( func backupsFromLocation( ctx context.Context, user username.SQLUsername, execCfg *sql.ExecutorConfig, loc string, ) ([]string, error) { + ctx, sp := tracing.ChildSpan(ctx, "backupdest.backupsFromLocation") + defer sp.Finish() + mkStore := execCfg.DistSQLSrv.ExternalStorageFromURI store, err := mkStore(ctx, loc, user) if err != nil { @@ -102,6 +109,35 @@ func backupsFromLocation( return prev, err } +// MakeBackupDestinationStores makes ExternalStorage handles to the passed in +// destinationDirs, and returns a cleanup function that closes this stores. It +// is the callers responsibility to call the returned cleanup function. +func MakeBackupDestinationStores( + ctx context.Context, + user username.SQLUsername, + mkStore cloud.ExternalStorageFromURIFactory, + destinationDirs []string, +) ([]cloud.ExternalStorage, func() error, error) { + incStores := make([]cloud.ExternalStorage, len(destinationDirs)) + for i := range destinationDirs { + store, err := mkStore(ctx, destinationDirs[i], user) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to open backup storage location") + } + incStores[i] = store + } + + return incStores, func() error { + // Close all the incremental stores in the returned cleanup function. + for _, store := range incStores { + if err := store.Close(); err != nil { + return err + } + } + return nil + }, nil +} + // ResolveIncrementalsBackupLocation returns the resolved locations of // incremental backups by looking into either the explicitly provided // incremental backup collections, or the full backup collections if no explicit @@ -114,6 +150,9 @@ func ResolveIncrementalsBackupLocation( fullBackupCollections []string, subdir string, ) ([]string, error) { + ctx, sp := tracing.ChildSpan(ctx, "backupdest.ResolveIncrementalsBackupLocation") + defer sp.Finish() + if len(explicitIncrementalCollections) > 0 { incPaths, err := backuputils.AppendPaths(explicitIncrementalCollections, subdir) if err != nil { diff --git a/pkg/ccl/backupccl/backupinfo/manifest_handling.go b/pkg/ccl/backupccl/backupinfo/manifest_handling.go index 000578e202b6..a48efa38af60 100644 --- a/pkg/ccl/backupccl/backupinfo/manifest_handling.go +++ b/pkg/ccl/backupccl/backupinfo/manifest_handling.go @@ -146,6 +146,8 @@ func ReadBackupManifestFromStore( encryption *jobspb.BackupEncryptionOptions, kmsEnv cloud.KMSEnv, ) (backuppb.BackupManifest, int64, error) { + ctx, sp := tracing.ChildSpan(ctx, "backupinfo.ReadBackupManifestFromStore") + defer sp.Finish() backupManifest, memSize, err := ReadBackupManifest(ctx, mem, exportStore, backupbase.BackupManifestName, encryption, kmsEnv) if err != nil { @@ -697,8 +699,8 @@ func LoadBackupManifests( var ErrLocalityDescriptor = errors.New(`Locality Descriptor not found`) // GetLocalityInfo takes a list of stores and their URIs, along with the main -// backup manifest searches each for the locality pieces listed in the the -// main manifest, returning the mapping. +// backup manifest searches each for the locality pieces listed in the main +// manifest, returning the mapping. func GetLocalityInfo( ctx context.Context, stores []cloud.ExternalStorage, @@ -1259,6 +1261,7 @@ func FetchPreviousBackups( } // getBackupManifests fetches the backup manifest from a list of backup URIs. +// The manifests are loaded from External Storage in parallel. func getBackupManifests( ctx context.Context, mem *mon.BoundAccount, diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index cb77cd689383..81822f60d2e2 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -1360,15 +1360,29 @@ func doRestorePlan( // vacuous, and we should proceed with restoring the base backup. // // Note that incremental _backup_ requests to this location will fail loudly instead. - baseStores := make([]cloud.ExternalStorage, len(fullyResolvedBaseDirectory)) - for i := range fullyResolvedBaseDirectory { - store, err := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, fullyResolvedBaseDirectory[i], p.User()) - if err != nil { - return errors.Wrapf(err, "failed to open backup storage location") + mkStore := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI + baseStores, cleanupFn, err := backupdest.MakeBackupDestinationStores(ctx, p.User(), mkStore, + fullyResolvedBaseDirectory) + if err != nil { + return err + } + defer func() { + if err := cleanupFn(); err != nil { + log.Warningf(ctx, "failed to close incremental store: %+v", err) } - defer store.Close() - baseStores[i] = store + }() + + incStores, cleanupFn, err := backupdest.MakeBackupDestinationStores(ctx, p.User(), mkStore, + fullyResolvedIncrementalsDirectory) + if err != nil { + return err } + defer func() { + if err := cleanupFn(); err != nil { + log.Warningf(ctx, "failed to close incremental store: %+v", err) + } + }() + ioConf := baseStores[0].ExternalIOConf() kmsEnv := backupencryption.MakeBackupKMSEnv(p.ExecCfg().Settings, &ioConf, p.ExecCfg().DB, p.User(), p.ExecCfg().InternalExecutor) @@ -1422,12 +1436,11 @@ func doRestorePlan( var mainBackupManifests []backuppb.BackupManifest var localityInfo []jobspb.RestoreDetails_BackupLocalityInfo var memReserved int64 - mkStore := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI if len(from) <= 1 { // Incremental layers are not specified explicitly. They will be searched for automatically. // This could be either INTO-syntax, OR TO-syntax. defaultURIs, mainBackupManifests, localityInfo, memReserved, err = backupdest.ResolveBackupManifests( - ctx, &mem, baseStores, mkStore, fullyResolvedBaseDirectory, + ctx, &mem, baseStores, incStores, mkStore, fullyResolvedBaseDirectory, fullyResolvedIncrementalsDirectory, endTime, encryption, &kmsEnv, p.User(), ) } else { diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index eb1083cd822f..dd95f31dfdcd 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -88,6 +88,9 @@ func (m manifestInfoReader) showBackup( kmsEnv cloud.KMSEnv, resultsCh chan<- tree.Datums, ) error { + ctx, sp := tracing.ChildSpan(ctx, "backupccl.showBackup") + defer sp.Finish() + var memReserved int64 defer func() { @@ -401,10 +404,20 @@ you must pass the 'encryption_info_dir' parameter that points to the directory o info.subdir = computedSubdir mkStore := p.ExecCfg().DistSQLSrv.ExternalStorageFromURI + incStores, cleanupFn, err := backupdest.MakeBackupDestinationStores(ctx, p.User(), mkStore, + fullyResolvedIncrementalsDirectory) + if err != nil { + return err + } + defer func() { + if err := cleanupFn(); err != nil { + log.Warningf(ctx, "failed to close incremental store: %+v", err) + } + }() info.defaultURIs, info.manifests, info.localityInfo, memReserved, err = backupdest.ResolveBackupManifests( - ctx, &mem, baseStores, mkStore, fullyResolvedDest, + ctx, &mem, baseStores, incStores, mkStore, fullyResolvedDest, fullyResolvedIncrementalsDirectory, hlc.Timestamp{}, encryption, &kmsEnv, p.User()) defer func() { mem.Shrink(ctx, memReserved) @@ -649,6 +662,9 @@ func backupShowerDefault( return backupShower{ header: backupShowerHeaders(showSchemas, opts), fn: func(ctx context.Context, info backupInfo) ([]tree.Datums, error) { + ctx, sp := tracing.ChildSpan(ctx, "backupccl.backupShowerDefault.fn") + defer sp.Finish() + var rows []tree.Datums for layer, manifest := range info.manifests { // Map database ID to descriptor name. @@ -1267,5 +1283,5 @@ func showBackupsInCollectionPlanHook( } func init() { - sql.AddPlanHook("show backup", showBackupPlanHook) + sql.AddPlanHook("backupccl.showBackupPlanHook", showBackupPlanHook) } diff --git a/pkg/security/auth.go b/pkg/security/auth.go index 79d818346e91..7fbfd497c219 100644 --- a/pkg/security/auth.go +++ b/pkg/security/auth.go @@ -114,15 +114,18 @@ func GetCertificateUserScope( // https://github.com/golang/go/blob/go1.8.1/src/crypto/tls/handshake_server.go#L723:L742 peerCert := tlsState.PeerCertificates[0] for _, uri := range peerCert.URIs { - tenantID, user, err := ParseTenantURISAN(uri.String()) - if err != nil { - return nil, err - } - scope := CertificateUserScope{ - Username: user, - TenantID: tenantID, + uriString := uri.String() + if URISANHasCRDBPrefix(uriString) { + tenantID, user, err := ParseTenantURISAN(uriString) + if err != nil { + return nil, err + } + scope := CertificateUserScope{ + Username: user, + TenantID: tenantID, + } + userScopes = append(userScopes, scope) } - userScopes = append(userScopes, scope) } if len(userScopes) == 0 { users := getCertificatePrincipals(peerCert) diff --git a/pkg/security/auth_test.go b/pkg/security/auth_test.go index d1292879a30d..9dd11dd95889 100644 --- a/pkg/security/auth_test.go +++ b/pkg/security/auth_test.go @@ -85,58 +85,98 @@ func makeFakeTLSState(t *testing.T, spec string) *tls.ConnectionState { func TestGetCertificateUserScope(t *testing.T) { defer leaktest.AfterTest(t)() - // Nil TLS state. - if _, err := security.GetCertificateUserScope(nil); err == nil { - t.Error("unexpected success") - } + t.Run("nil TLS state", func(t *testing.T) { + if _, err := security.GetCertificateUserScope(nil); err == nil { + t.Error("unexpected success") + } + }) - // No certificates. - if _, err := security.GetCertificateUserScope(makeFakeTLSState(t, "")); err == nil { - t.Error("unexpected success") - } + t.Run("no certificates", func(t *testing.T) { + if _, err := security.GetCertificateUserScope(makeFakeTLSState(t, "")); err == nil { + t.Error("unexpected success") + } + }) - // Good request: single certificate. - if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo")); err != nil { - t.Error(err) - } else { - require.Equal(t, 1, len(userScopes)) - require.Equal(t, "foo", userScopes[0].Username) - require.True(t, userScopes[0].Global) - } + t.Run("good request: single certificate", func(t *testing.T) { + if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo")); err != nil { + t.Error(err) + } else { + require.Equal(t, 1, len(userScopes)) + require.Equal(t, "foo", userScopes[0].Username) + require.True(t, userScopes[0].Global) + } + }) - // Request with multiple certs, but only one chain (eg: origin certs are client and CA). - if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo;CA")); err != nil { - t.Error(err) - } else { - require.Equal(t, 1, len(userScopes)) - require.Equal(t, "foo", userScopes[0].Username) - require.True(t, userScopes[0].Global) - } + t.Run("request with multiple certs, but only one chain (eg: origin certs are client and CA)", func(t *testing.T) { + if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo;CA")); err != nil { + t.Error(err) + } else { + require.Equal(t, 1, len(userScopes)) + require.Equal(t, "foo", userScopes[0].Username) + require.True(t, userScopes[0].Global) + } + }) - // Always use the first certificate. - if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo;bar")); err != nil { - t.Error(err) - } else { - require.Equal(t, 1, len(userScopes)) - require.Equal(t, "foo", userScopes[0].Username) - require.True(t, userScopes[0].Global) - } + t.Run("always use the first certificate", func(t *testing.T) { + if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo;bar")); err != nil { + t.Error(err) + } else { + require.Equal(t, 1, len(userScopes)) + require.Equal(t, "foo", userScopes[0].Username) + require.True(t, userScopes[0].Global) + } + }) - // Extract all of the principals from the first certificate. - if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo,dns:bar,dns:blah;CA")); err != nil { - t.Error(err) - } else { - require.Equal(t, 3, len(userScopes)) - require.True(t, userScopes[0].Global) - } - if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo,uri:crdb://tenant/123/user/foo;CA")); err != nil { - t.Error(err) - } else { - require.Equal(t, 1, len(userScopes)) - require.Equal(t, "foo", userScopes[0].Username) - require.Equal(t, roachpb.MakeTenantID(123), userScopes[0].TenantID) - require.False(t, userScopes[0].Global) - } + t.Run("extract all of the principals from the first certificate", func(t *testing.T) { + if userScopes, err := security.GetCertificateUserScope(makeFakeTLSState(t, "foo,dns:bar,dns:blah;CA")); err != nil { + t.Error(err) + } else { + require.Equal(t, 3, len(userScopes)) + require.True(t, userScopes[0].Global) + } + }) + + t.Run("extracts username, tenantID from tenant URI SAN", func(t *testing.T) { + if userScopes, err := security.GetCertificateUserScope( + makeFakeTLSState(t, "foo,uri:crdb://tenant/123/user/foo;CA")); err != nil { + t.Error(err) + } else { + require.Equal(t, 1, len(userScopes)) + require.Equal(t, "foo", userScopes[0].Username) + require.Equal(t, roachpb.MakeTenantID(123), userScopes[0].TenantID) + require.False(t, userScopes[0].Global) + } + }) + + t.Run("extracts tenant URI SAN even when multiple URIs, where one URI is not of CRBD format", func(t *testing.T) { + if userScopes, err := security.GetCertificateUserScope( + makeFakeTLSState(t, "foo,uri:mycompany:sv:rootclient:dev:usw1,uri:crdb://tenant/123/user/foo;CA")); err != nil { + t.Error(err) + } else { + require.Equal(t, 1, len(userScopes)) + require.Equal(t, "foo", userScopes[0].Username) + require.Equal(t, roachpb.MakeTenantID(123), userScopes[0].TenantID) + require.False(t, userScopes[0].Global) + } + }) + + t.Run("errors when tenant URI SAN is not of expected format, even if other URI SAN is provided", func(t *testing.T) { + userScopes, err := security.GetCertificateUserScope( + makeFakeTLSState(t, "foo,uri:mycompany:sv:rootclient:dev:usw1,uri:crdb://tenant/bad/format/123;CA")) + require.Nil(t, userScopes) + require.ErrorContains(t, err, "invalid tenant URI SAN") + }) + + t.Run("falls back to global client cert when crdb URI SAN scheme is not followed", func(t *testing.T) { + if userScopes, err := security.GetCertificateUserScope( + makeFakeTLSState(t, "sanuri,uri:mycompany:sv:rootclient:dev:usw1;CA")); err != nil { + t.Error(err) + } else { + require.Equal(t, 1, len(userScopes)) + require.Equal(t, "sanuri", userScopes[0].Username) + require.True(t, userScopes[0].Global) + } + }) } func TestSetCertPrincipalMap(t *testing.T) { diff --git a/pkg/security/x509.go b/pkg/security/x509.go index 7e7dbc65f2d1..413a4a040a66 100644 --- a/pkg/security/x509.go +++ b/pkg/security/x509.go @@ -38,7 +38,8 @@ const ( validFrom = -time.Hour * 24 maxPathLength = 1 caCommonName = "Cockroach CA" - tenantURISANFormatString = "crdb://tenant/%d/user/%s" + tenantURISANPrefixString = "crdb://" + tenantURISANFormatString = tenantURISANPrefixString + "tenant/%d/user/%s" // TenantsOU is the OrganizationalUnit that determines a client certificate should be treated as a tenant client // certificate (as opposed to a KV node client certificate). @@ -333,6 +334,11 @@ func MakeTenantURISANs( return urls, nil } +// URISANHasCRDBPrefix indicates whether a URI string has the tenant URI SAN prefix. +func URISANHasCRDBPrefix(rawlURI string) bool { + return strings.HasPrefix(rawlURI, tenantURISANPrefixString) +} + // ParseTenantURISAN extracts the user and tenant ID contained within a tenant URI SAN. func ParseTenantURISAN(rawURL string) (roachpb.TenantID, string, error) { r := strings.NewReader(rawURL) diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index b400ebdf04e1..2b131997bf95 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -2649,21 +2649,9 @@ ALTER TABLE t_add_column_not_null ADD COLUMN j INT GENERATED ALWAYS AS (NULL:::I statement error pgcode 23502 null value in column "j" violates not-null constraint ALTER TABLE t_add_column_not_null ADD COLUMN j INT GENERATED ALWAYS AS (NULL::INT) VIRTUAL NOT NULL UNIQUE; -# Note that this should absolutely not succeed, but it does because of #81675. -# We'll rely on that in order to test that the subsequent index build fails. -# When addressing #81675, this portion of this test will need to be removed. -skipif config local-legacy-schema-changer -statement ok -ALTER TABLE t_add_column_not_null ADD COLUMN j INT GENERATED ALWAYS AS (NULL::INT) VIRTUAL NOT NULL; - -onlyif config local-legacy-schema-changer statement error validation of NOT NULL constraint failed: validation of CHECK "j IS NOT NULL" failed on row: i=1, j=NULL ALTER TABLE t_add_column_not_null ADD COLUMN j INT GENERATED ALWAYS AS (NULL::INT) VIRTUAL NOT NULL; -skipif config local-legacy-schema-changer -statement error pgcode 23502 null value in column "j" violates not-null constraint -CREATE INDEX idx_j ON t_add_column_not_null (j); - statement ok DROP TABLE t_add_column_not_null diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go index 01a43c5d905e..3c0a6f08f789 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go @@ -44,6 +44,7 @@ func alterTableAddColumn( // We don't support handling zone config related properties for tables, so // throw an unsupported error. fallBackIfZoneConfigExists(b, d, tbl.TableID) + fallBackIfVirtualColumnWithNotNullConstraint(t) // Check column non-existence. { elts := b.ResolveColumn(tbl.TableID, d.Name, ResolveParams{ diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/helpers.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/helpers.go index c4e8f7cc9130..a4d897f3b35c 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/helpers.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/helpers.go @@ -665,3 +665,17 @@ func fallBackIfZoneConfigExists(b BuildCtx, n tree.NodeFormatter, id catid.DescI } } } + +// fallBackIfVirtualColumnWithNotNullConstraint throws an unimplemented error +// if the to-be-added column `d` is a virtual column with not null constraint. +// This is a quick, temporary fix for the following troubled stmt in the +// declarative schema changer: +// `ALTER TABLE t ADD COLUMN j INT AS (NULL::INT) VIRTUAL NOT NULL;` succeeded +// but expectedly failed in the legacy schema changer. +func fallBackIfVirtualColumnWithNotNullConstraint(t *tree.AlterTableAddColumn) { + d := t.ColumnDef + if d.IsVirtual() && d.Nullable.Nullability == tree.NotNull { + panic(scerrors.NotImplementedErrorf(t, + "virtual column with NOT NULL constraint is not supported")) + } +} diff --git a/pkg/sql/schemachanger/sctest/cumulative.go b/pkg/sql/schemachanger/sctest/cumulative.go index e3cc32c4f22a..039d37d5497a 100644 --- a/pkg/sql/schemachanger/sctest/cumulative.go +++ b/pkg/sql/schemachanger/sctest/cumulative.go @@ -360,15 +360,20 @@ var runAllBackups = flag.Bool( // cluster constructor needs to provide a cluster with CCL BACKUP/RESTORE // functionality enabled. func Backup(t *testing.T, path string, newCluster NewClusterFunc) { - var after [][]string + var after [][]string // CREATE_STATEMENT for all descriptors after finishing `stmts` in each test case. var dbName string r, _ := randutil.NewTestRand() const runRate = .5 + maybeRandomlySkip := func(t *testing.T) { if !*runAllBackups && r.Float64() >= runRate { skip.IgnoreLint(t, "skipping due to randomness") } } + + // A function that executes `setup` first and then count the number of + // postCommit and postCommitNonRevertible stages for executing `stmts`. + // It also initializes `after` and `dbName` here. countStages := func( t *testing.T, setup, stmts []parser.Statement, ) (postCommit, nonRevertible int) { @@ -389,39 +394,22 @@ func Backup(t *testing.T, path string, newCluster NewClusterFunc) { }) return postCommit, nonRevertible } - var testBackupRestoreCase func( - t *testing.T, setup, stmts []parser.Statement, ord int, - ) - testFunc := func(t *testing.T, _ string, _ bool, setup, stmts []parser.Statement) { - postCommit, nonRevertible := countStages(t, setup, stmts) - n := postCommit + nonRevertible - t.Logf( - "test case has %d revertible post-commit stages and %d non-revertible"+ - " post-commit stages", postCommit, nonRevertible, - ) - for i := 0; i <= n; i++ { - if !t.Run( - fmt.Sprintf("backup/restore stage %d of %d", i, n), - func(t *testing.T) { - maybeRandomlySkip(t) - testBackupRestoreCase(t, setup, stmts, i) - }, - ) { - return - } - } - } - type stage struct { - p scplan.Plan - stageIdx int - resume chan error - } - mkStage := func(p scplan.Plan, stageIdx int) stage { - return stage{p: p, stageIdx: stageIdx, resume: make(chan error)} - } - testBackupRestoreCase = func( + + // A function that takes backup at `ord`-th stage while executing `stmts` after + // finishing `setup`. It also takes `ord` backups at each of the preceding stage + // if it's a revertible stage. + // It then restores the backup(s) in various "flavors" (see + // comment below for details) and expect the restore to finish the schema change job + // as if the backup/restore had never happened. + testBackupRestoreCase := func( t *testing.T, setup, stmts []parser.Statement, ord int, ) { + type stage struct { + p scplan.Plan + stageIdx int + resume chan error + } + stageChan := make(chan stage) ctx, cancel := context.WithCancel(context.Background()) db, cleanup := newCluster(t, &scexec.TestingKnobs{ @@ -430,7 +418,7 @@ func Backup(t *testing.T, path string, newCluster NewClusterFunc) { return nil } if stageChan != nil { - s := mkStage(p, stageIdx) + s := stage{p: p, stageIdx: stageIdx, resume: make(chan error)} select { case stageChan <- s: case <-ctx.Done(): @@ -561,21 +549,75 @@ func Backup(t *testing.T, path string, newCluster NewClusterFunc) { t.Logf("finished") for i, b := range backups { - for _, isSchemaOnly := range []bool{true, false} { - name := "" - if isSchemaOnly { - name = "schema-only" - } - t.Run(name, func(t *testing.T) { + // For each backup, we restore it in three flavors. + // 1. RESTORE DATABASE + // 2. RESTORE DATABASE WITH schema_only + // 3. RESTORE TABLE tbl1, tbl2, ..., tblN + // We then assert that the restored database should correctly finish + // the ongoing schema change job when the backup was taken, and + // reaches the expected state as if the back/restore had not happened at all. + // Skip a backup randomly. + type backupConsumptionFlavor struct { + name string + restoreSetup []string + restoreQuery string + } + flavors := []backupConsumptionFlavor{ + { + name: "restore database", + restoreSetup: []string{ + fmt.Sprintf("DROP DATABASE IF EXISTS %q CASCADE", dbName), + "SET use_declarative_schema_changer = 'off'", + }, + restoreQuery: fmt.Sprintf("RESTORE DATABASE %s FROM LATEST IN '%s'", dbName, b.url), + }, + { + name: "restore database with schema-only", + restoreSetup: []string{ + fmt.Sprintf("DROP DATABASE IF EXISTS %q CASCADE", dbName), + "SET use_declarative_schema_changer = 'off'", + }, + restoreQuery: fmt.Sprintf("RESTORE DATABASE %s FROM LATEST IN '%s' with schema_only", dbName, b.url), + }, + } + + // For the third flavor, we restore all tables in the backup. + // Skip it if there is no tables. + rows := tdb.QueryStr(t, ` + SELECT parent_schema_name, object_name + FROM [SHOW BACKUP FROM LATEST IN $1] + WHERE database_name = $2 AND object_type = 'table'`, b.url, dbName) + var tablesToRestore []string + for _, row := range rows { + tablesToRestore = append(tablesToRestore, fmt.Sprintf("%s.%s.%s", dbName, row[0], row[1])) + } + + if len(tablesToRestore) > 0 { + flavors = append(flavors, backupConsumptionFlavor{ + name: "restore all tables in database", + restoreSetup: []string{ + fmt.Sprintf("DROP DATABASE IF EXISTS %q CASCADE", dbName), + fmt.Sprintf("CREATE DATABASE %q", dbName), + "SET use_declarative_schema_changer = 'off'", + }, + restoreQuery: fmt.Sprintf("RESTORE TABLE %s FROM LATEST IN '%s' WITH skip_missing_sequences", + strings.Join(tablesToRestore, ","), b.url), + }) + } + + // TODO (xiang): Add here the fourth flavor that restores + // only a subset, maybe randomly chosen, of all tables with + // `RESTORE TABLE`. Currently, it's blocked by issue #87518. + // We will need to change what the expected output will be + // in this case, since it will no longer be simply `before` + // and `after`. + + for _, flavor := range flavors { + t.Run(flavor.name, func(t *testing.T) { maybeRandomlySkip(t) - t.Logf("testing backup %d %v", i, b.isRollback) - tdb.Exec(t, fmt.Sprintf("DROP DATABASE IF EXISTS %q CASCADE", dbName)) - tdb.Exec(t, "SET use_declarative_schema_changer = 'off'") - restoreQuery := fmt.Sprintf("RESTORE DATABASE %s FROM LATEST IN '%s'", dbName, b.url) - if isSchemaOnly { - restoreQuery = restoreQuery + " with schema_only" - } - tdb.Exec(t, restoreQuery) + t.Logf("testing backup %d (rollback=%v)", i, b.isRollback) + tdb.ExecMultiple(t, flavor.restoreSetup...) + tdb.Exec(t, flavor.restoreQuery) tdb.Exec(t, fmt.Sprintf("USE %q", dbName)) waitForSchemaChangesToFinish(t, tdb) afterRestore := tdb.QueryStr(t, fetchDescriptorStateQuery) @@ -596,6 +638,27 @@ SELECT * FROM crdb_internal.invalid_objects WHERE database_name != 'backups' } } } + + testFunc := func(t *testing.T, _ string, _ bool, setup, stmts []parser.Statement) { + postCommit, nonRevertible := countStages(t, setup, stmts) + n := postCommit + nonRevertible + t.Logf( + "test case has %d revertible post-commit stages and %d non-revertible"+ + " post-commit stages", postCommit, nonRevertible, + ) + for i := 0; i <= n; i++ { + if !t.Run( + fmt.Sprintf("backup/restore stage %d of %d", i, n), + func(t *testing.T) { + maybeRandomlySkip(t) + testBackupRestoreCase(t, setup, stmts, i) + }, + ) { + return + } + } + } + cumulativeTest(t, path, testFunc) }