diff --git a/pkg/ccl/backupccl/backup_destination.go b/pkg/ccl/backupccl/backup_destination.go index a8edf77f29e2..a4dc5d548722 100644 --- a/pkg/ccl/backupccl/backup_destination.go +++ b/pkg/ccl/backupccl/backup_destination.go @@ -10,8 +10,6 @@ package backupccl import ( "context" - "encoding/hex" - "fmt" "net/url" "strings" @@ -27,12 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -67,6 +63,27 @@ func fetchPreviousBackups( return prevBackups, encryptionOptions, size, nil } +// backupDestination encapsulates information that is populated while resolving +// the destination of a backup. +type backupDestination struct { + // collectionURI is the URI pointing to the backup collection. + collectionURI string + + // defaultURI is the full path of the defaultURI of the backup. + defaultURI string + + // chosenSubdir is the automatically chosen suffix within the collection path + // if we're backing up INTO a collection. + chosenSubdir string + + // urisByLocalityKV is a mapping from the locality tag to the corresponding + // locality aware backup URI. + urisByLocalityKV map[string]string + + // prevBackupURIs is the list of full paths for previous backups in the chain. + prevBackupURIs []string +} + // resolveDest resolves the true destination of a backup. The backup command // provided by the user may point to a backup collection, or a backup location // which auto-appends incremental backups to it. This method checks for these @@ -83,25 +100,16 @@ func resolveDest( endTime hlc.Timestamp, incrementalFrom []string, execCfg *sql.ExecutorConfig, -) ( - collectionURI string, - plannedBackupDefaultURI string, /* the full path for the planned backup */ - /* chosenSuffix is the automatically chosen suffix within the collection path - if we're backing up INTO a collection. */ - chosenSuffix string, - urisByLocalityKV map[string]string, - prevBackupURIs []string, /* list of full paths for previous backups in the chain */ - err error, -) { +) (backupDestination, error) { makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI defaultURI, _, err := getURIsByLocalityKV(dest.To, "") if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } - chosenSuffix = dest.Subdir - + var collectionURI string + chosenSuffix := dest.Subdir if chosenSuffix != "" { // The legacy backup syntax, BACKUP TO, leaves the dest.Subdir and collection parameters empty. collectionURI = defaultURI @@ -109,44 +117,46 @@ func resolveDest( if chosenSuffix == latestFileName { latest, err := readLatestFile(ctx, defaultURI, makeCloudStorage, user) if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } chosenSuffix = latest } } - plannedBackupDefaultURI, urisByLocalityKV, err = getURIsByLocalityKV(dest.To, chosenSuffix) + plannedBackupDefaultURI, urisByLocalityKV, err := getURIsByLocalityKV(dest.To, chosenSuffix) if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } - // At this point, the plannedBackupDefaultURI is the full path for the backup. For BACKUP - // INTO, this path includes the chosenSuffix. Once this function returns, the - // plannedBackupDefaultURI will be the full path for this backup in planning. + // At this point, the defaultURI is the full path for the backup. For BACKUP + // INTO, this path includes the chosenSubdir. Once this function returns, the + // defaultURI will be the full path for this backup in planning. if len(incrementalFrom) != 0 { // Legacy backup with deprecated BACKUP TO-syntax. - prevBackupURIs = incrementalFrom - return collectionURI, plannedBackupDefaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, nil + prevBackupURIs := incrementalFrom + return backupDestination{ + collectionURI: collectionURI, + defaultURI: plannedBackupDefaultURI, + chosenSubdir: chosenSuffix, + urisByLocalityKV: urisByLocalityKV, + prevBackupURIs: prevBackupURIs, + }, nil } defaultStore, err := makeCloudStorage(ctx, plannedBackupDefaultURI, user) if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } defer defaultStore.Close() exists, err := containsManifest(ctx, defaultStore) if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } - if exists && !dest.Exists && chosenSuffix != "" && execCfg.Settings.Version.IsActive(ctx, - clusterversion.Start22_1) { + if exists && !dest.Exists && chosenSuffix != "" && + execCfg.Settings.Version.IsActive(ctx, clusterversion.Start22_1) { // We disallow a user from writing a full backup to a path in a collection containing an // existing backup iff we're 99.9% confident this backup was planned on a 22.1 node. - return "", - "", - "", - nil, - nil, + return backupDestination{}, errors.Newf("A full backup already exists in %s. "+ "Consider running an incremental backup to this full backup via `BACKUP INTO '%s' IN '%s'`", plannedBackupDefaultURI, chosenSuffix, dest.To[0]) @@ -168,7 +178,7 @@ func resolveDest( featureFullBackupUserSubdir, "'Full Backup with user defined subdirectory'", ); err != nil { - return "", "", "", nil, nil, errors.Wrapf(err, + return backupDestination{}, errors.Wrapf(err, "The full backup cannot get written to '%s', a user defined subdirectory. "+ "To take a full backup, remove the subdirectory from the backup command, "+ "(i.e. run 'BACKUP ... INTO '). "+ @@ -179,7 +189,13 @@ func resolveDest( } } // There's no full backup in the resolved subdirectory; therefore, we're conducting a full backup. - return collectionURI, plannedBackupDefaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, nil + return backupDestination{ + collectionURI: collectionURI, + defaultURI: plannedBackupDefaultURI, + chosenSubdir: chosenSuffix, + urisByLocalityKV: urisByLocalityKV, + prevBackupURIs: nil, + }, nil } // The defaultStore contains a full backup; consequently, we're conducting an incremental backup. @@ -191,42 +207,48 @@ func resolveDest( dest.To, chosenSuffix) if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } priorsDefaultURI, _, err := getURIsByLocalityKV(fullyResolvedIncrementalsLocation, "") if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } incrementalStore, err := makeCloudStorage(ctx, priorsDefaultURI, user) if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } defer incrementalStore.Close() priors, err := FindPriorBackups(ctx, incrementalStore, OmitManifest) if err != nil { - return "", "", "", nil, nil, errors.Wrap(err, "adjusting backup destination to append new layer to existing backup") + return backupDestination{}, errors.Wrap(err, "adjusting backup destination to append new layer to existing backup") } + prevBackupURIs := make([]string, 0) for _, prior := range priors { priorURI, err := url.Parse(priorsDefaultURI) if err != nil { - return "", "", "", nil, nil, errors.Wrapf(err, "parsing default backup location %s", - priorsDefaultURI) + return backupDestination{}, errors.Wrapf(err, "parsing default backup location %s", priorsDefaultURI) } priorURI.Path = JoinURLPath(priorURI.Path, prior) prevBackupURIs = append(prevBackupURIs, priorURI.String()) } prevBackupURIs = append([]string{plannedBackupDefaultURI}, prevBackupURIs...) - // Within the chosenSuffix dir, differentiate incremental backups with partName. + // Within the chosenSubdir dir, differentiate incremental backups with partName. partName := endTime.GoTime().Format(DateBasedIncFolderName) defaultIncrementalsURI, urisByLocalityKV, err := getURIsByLocalityKV(fullyResolvedIncrementalsLocation, partName) if err != nil { - return "", "", "", nil, nil, err + return backupDestination{}, err } - return collectionURI, defaultIncrementalsURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, nil + return backupDestination{ + collectionURI: collectionURI, + defaultURI: defaultIncrementalsURI, + chosenSubdir: chosenSuffix, + urisByLocalityKV: urisByLocalityKV, + prevBackupURIs: prevBackupURIs, + }, nil } // getBackupManifests fetches the backup manifest from a list of backup URIs. @@ -450,17 +472,6 @@ func writeNewLatestFile( // sorted to the top. This will be the last latest file we write. It // Takes the one's complement of the timestamp so that files are sorted // lexicographically such that the most recent is always the top. - return cloud.WriteFile(ctx, exportStore, newTimestampedLatestFileName(), strings.NewReader(suffix)) -} - -// newTimestampedLatestFileName returns a string of a new latest filename -// with a suffixed version. It returns it in the format of LATEST- -// where version is a hex encoded one's complement of the timestamp. -// This means that as long as the supplied timestamp is correct, the filenames -// will adhere to a lexicographical/utf-8 ordering such that the most -// recent file is at the top. -func newTimestampedLatestFileName() string { - var buffer []byte - buffer = encoding.EncodeStringDescending(buffer, timeutil.Now().String()) - return fmt.Sprintf("%s/%s-%s", latestHistoryDirectory, latestFileName, hex.EncodeToString(buffer)) + versionedLatestFileName := newTimestampedFilename(latestFileName) + return cloud.WriteFile(ctx, exportStore, latestHistoryDirectory+"/"+versionedLatestFileName, strings.NewReader(suffix)) } diff --git a/pkg/ccl/backupccl/backup_destination_test.go b/pkg/ccl/backupccl/backup_destination_test.go index 32b877dfbd51..6fc551a1441d 100644 --- a/pkg/ccl/backupccl/backup_destination_test.go +++ b/pkg/ccl/backupccl/backup_destination_test.go @@ -121,7 +121,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { defaultDest, localitiesDest, err := getURIsByLocalityKV(to, "") require.NoError(t, err) - collectionURI, defaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, err := resolveDest( + backupDestination, err := resolveDest( ctx, security.RootUserName(), jobspb.BackupDetails_Destination{To: to}, endTime, @@ -131,12 +131,12 @@ func TestBackupRestoreResolveDestination(t *testing.T) { require.NoError(t, err) // Not an INTO backup, so no collection of suffix info. - require.Equal(t, "", collectionURI) - require.Equal(t, "", chosenSuffix) + require.Equal(t, "", backupDestination.collectionURI) + require.Equal(t, "", backupDestination.chosenSubdir) - require.Equal(t, defaultDest, defaultURI) - require.Equal(t, localitiesDest, urisByLocalityKV) - require.Equal(t, incrementalFrom, prevBackupURIs) + require.Equal(t, defaultDest, backupDestination.defaultURI) + require.Equal(t, localitiesDest, backupDestination.urisByLocalityKV) + require.Equal(t, incrementalFrom, backupDestination.prevBackupURIs) } // The first initial full backup: BACKUP TO full. @@ -188,7 +188,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { ) { endTime := hlc.Timestamp{WallTime: backupTime.UnixNano()} - collectionURI, defaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, err := resolveDest( + backupDestination, err := resolveDest( ctx, security.RootUserName(), jobspb.BackupDetails_Destination{To: to}, endTime, @@ -198,11 +198,11 @@ func TestBackupRestoreResolveDestination(t *testing.T) { require.NoError(t, err) // Not a backup collection. - require.Equal(t, "", collectionURI) - require.Equal(t, "", chosenSuffix) - require.Equal(t, expectedDefault, defaultURI) - require.Equal(t, expectedLocalities, urisByLocalityKV) - require.Equal(t, expectedPrevBackups, prevBackupURIs) + require.Equal(t, "", backupDestination.collectionURI) + require.Equal(t, "", backupDestination.chosenSubdir) + require.Equal(t, expectedDefault, backupDestination.defaultURI) + require.Equal(t, expectedLocalities, backupDestination.urisByLocalityKV) + require.Equal(t, expectedPrevBackups, backupDestination.prevBackupURIs) } // Initial full backup: BACKUP TO baseDir. @@ -348,7 +348,7 @@ func TestBackupRestoreResolveDestination(t *testing.T) { if expectedIncDir != "" { fullBackupExists = true } - collectionURI, defaultURI, chosenSuffix, urisByLocalityKV, prevBackupURIs, err := resolveDest( + backupDestination, err := resolveDest( ctx, security.RootUserName(), jobspb.BackupDetails_Destination{To: collectionTo, Subdir: subdir, IncrementalStorage: incrementalTo, Exists: fullBackupExists}, @@ -368,13 +368,13 @@ func TestBackupRestoreResolveDestination(t *testing.T) { localityDests[locality] = u.String() } - require.Equal(t, collectionLoc, collectionURI) - require.Equal(t, expectedSuffix, chosenSuffix) + require.Equal(t, collectionLoc, backupDestination.collectionURI) + require.Equal(t, expectedSuffix, backupDestination.chosenSubdir) - require.Equal(t, expectedDefault, defaultURI) - require.Equal(t, localityDests, urisByLocalityKV) + require.Equal(t, expectedDefault, backupDestination.defaultURI) + require.Equal(t, localityDests, backupDestination.urisByLocalityKV) - require.Equal(t, expectedPrevBackups, prevBackupURIs) + require.Equal(t, expectedPrevBackups, backupDestination.prevBackupURIs) } // Initial: BACKUP INTO collection diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 96f1bef8e889..e277e5bc5f77 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -124,7 +124,6 @@ func backup( execCtx sql.JobExecContext, defaultURI string, urisByLocalityKV map[string]string, - db *kv.DB, settings *cluster.Settings, defaultStore cloud.ExternalStorage, storageByLocalityKV map[string]*roachpb.ExternalStorage, @@ -134,9 +133,6 @@ func backup( encryption *jobspb.BackupEncryptionOptions, statsCache *stats.TableStatisticsCache, ) (roachpb.RowCount, error) { - // TODO(dan): Figure out how permissions should work. #6713 is tracking this - // for grpc. - resumerSpan := tracing.SpanFromContext(ctx) var lastCheckpoint time.Time @@ -402,14 +398,71 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { details := b.job.Details().(jobspb.BackupDetails) p := execCtx.(sql.JobExecContext) - var backupManifest *BackupManifest + // Resolve the backup destination. We can skip this step if we have already + // resolved and persisted the destination either during planning or a previous + // resumption of this job. + defaultURI := details.URI + var backupDest backupDestination + if details.URI == "" { + var err error + backupDest, err = resolveDest(ctx, p.User(), details.Destination, details.EndTime, + details.IncrementalFrom, p.ExecCfg()) + if err != nil { + return err + } + defaultURI = backupDest.defaultURI + } + + // The backup job needs to lay claim to the bucket it is writing to, to + // prevent concurrent backups from writing to the same location. + // + // If we have already locked the location, either on a previous resume of the + // job or during planning because `clusterversion.BackupResolutionInJob` isn't + // active, we do not want to lock it again. + foundLockFile, err := checkForBackupLockFile(ctx, p.ExecCfg(), defaultURI, b.job.ID(), p.User()) + if err != nil { + return err + } + + // TODO(adityamaru): We can delete the `details.URI == ""` check in 22.2. This + // is present in 22.1 to guard against the case where we have already written + // a BACKUP-LOCK file during planning (mixed-version cluster) and do not want + // to re-check and re-write the lock file. In that case `details.URI` will + // non-empty. + if details.URI == "" && !foundLockFile { + if err := checkForPreviousBackup(ctx, p.ExecCfg(), backupDest.defaultURI, b.job.ID(), + p.User()); err != nil { + return err + } + + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_lock"); err != nil { + return err + } + + if err := writeLockOnBackupLocation(ctx, p.ExecCfg(), backupDest.defaultURI, + b.job.ID(), p.User()); err != nil { + return err + } + + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_lock"); err != nil { + return err + } + } - // If planning didn't resolve the external destination, then we need to now. + // Populate the BackupDetails with the resolved backup destination, and + // construct the BackupManifest to be written to external storage as a + // BACKUP-CHECKPOINT. We can skip this step if one of the following are true: + // + // 1) The gateway node has already resolved the backup details and manifest + // during planning. + // + // 2) The job has already persisted the resolved details and manifest in a + // prior resumption. + var backupManifest *BackupManifest if details.URI == "" { initialDetails := details backupDetails, m, err := getBackupDetailAndManifest( - ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, details, p.User(), - ) + ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, details, p.User(), backupDest) if err != nil { return err } @@ -446,12 +499,20 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { } } + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.write_first_checkpoint"); err != nil { + return err + } + if err := writeBackupManifestCheckpoint( ctx, details.URI, details.EncryptionOptions, backupManifest, p.ExecCfg(), p.User(), ); err != nil { return err } + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.write_first_checkpoint"); err != nil { + return err + } + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return planSchedulePTSChaining(ctx, p.ExecCfg(), txn, &details, b.job.CreatedBy()) }); err != nil { @@ -463,11 +524,13 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { // Ideally we'd re-render the description now that we know the subdir, but // we don't have backup AST node anymore to easily call the rendering func. // Instead we can just do a bit of dirty string replacement iff there is one - // "INTO 'LATEST' IN" (if there's >1, somenoe has a weird table/db names and + // "INTO 'LATEST' IN" (if there's >1, someone has a weird table/db names and // we should just leave the description as-is, since it is just for humans). description := b.job.Payload().Description const unresolvedText = "INTO 'LATEST' IN" - if initialDetails.Destination.Subdir == "LATEST" && strings.Count(description, unresolvedText) == 1 { + // Note, we are using initialDetails below which is a copy of the + // BackupDetails before destination resolution. + if initialDetails.Destination.Subdir == latestFileName && strings.Count(description, unresolvedText) == 1 { description = strings.ReplaceAll(description, unresolvedText, fmt.Sprintf("INTO '%s' IN", details.Destination.Subdir)) } @@ -486,6 +549,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { return err } + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.after.details_has_checkpoint"); err != nil { + return err + } + // Collect telemetry, once per backup after resolving its destination. lic := utilccl.CheckEnterpriseEnabled( p.ExecCfg().Settings, p.ExecCfg().LogicalClusterID(), p.ExecCfg().Organization(), "", @@ -550,7 +617,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { MaxRetries: 5, } - if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before_flow"); err != nil { + if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before.flow"); err != nil { return err } @@ -570,7 +637,6 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { p, details.URI, details.URIsByLocalityKV, - p.ExecCfg().DB, p.ExecCfg().Settings, defaultStore, storageByLocalityKV, diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 768f20470248..609616d0598a 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -892,14 +892,50 @@ func backupPlanHook( } // TODO(dt): delete this in 22.2. + backupDest, err := resolveDest(ctx, p.User(), initialDetails.Destination, initialDetails.EndTime, + initialDetails.IncrementalFrom, p.ExecCfg()) + if err != nil { + return err + } + + // The backup job needs to lay a claim on the bucket it is writing to, to + // prevent concurrent backups from writing to the same location. + // + // We choose the nodeID planning the backup as the suffix of the lock + // file, so that other nodes running 22.1 binaries in this cluster will + // not be able to backup to the same location. Ideally, we would have used + // `jobID` but this can change across txn restarts, and so we could end up + // in a situation where the txn restarts and checkForPreviousBackup + // considers the LOCK file we wrote with the "old" jobID (in the previous + // txn attempt) as a LOCK file written by another backup. Effectively, + // locking ourselves out. + // + // The nodeID is static across txn restarts. + nodeID := jobspb.JobID(p.ExecCfg().NodeID.SQLInstanceID()) + foundLockFile, err := checkForBackupLockFile(ctx, p.ExecCfg(), backupDest.defaultURI, nodeID, p.User()) + if err != nil { + return err + } + if !foundLockFile { + if err := checkForPreviousBackup(ctx, p.ExecCfg(), backupDest.defaultURI, nodeID, + p.User()); err != nil { + return err + } + + if err := writeLockOnBackupLocation(ctx, p.ExecCfg(), backupDest.defaultURI, nodeID, + p.User()); err != nil { + return err + } + } + backupDetails, backupManifest, err := getBackupDetailAndManifest( - ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, initialDetails, p.User(), - ) + ctx, p.ExecCfg(), p.ExtendedEvalContext().Txn, initialDetails, p.User(), backupDest) if err != nil { return err } - description, err := backupJobDescription(p, backupStmt.Backup, to, incrementalFrom, encryptionParams.RawKmsUris, backupDetails.Destination.Subdir, initialDetails.Destination.IncrementalStorage) + description, err := backupJobDescription(p, backupStmt.Backup, to, incrementalFrom, + encryptionParams.RawKmsUris, backupDetails.Destination.Subdir, initialDetails.Destination.IncrementalStorage) if err != nil { return err } @@ -1193,7 +1229,7 @@ func writeBackupManifestCheckpoint( // the checkpoints and pick the file whose name is lexicographically // sorted to the top. This will be the last checkpoint we write, for // details refer to newTimestampedCheckpointFileName. - filename := newTimestampedCheckpointFileName() + filename := newTimestampedFilename(backupManifestCheckpointName) // HTTP storage does not support listing and so we cannot rely on the // above-mentioned List method to return us the latest checkpoint file. @@ -1644,12 +1680,19 @@ func checkForNewTables( return nil } +// getBackupDetailsAndManifest populates the backup job's `BackupDetails` and +// `BackupManifest` with relevant metadata. +// +// Note, in 22.1 this method is called from either backup planning or backup job +// execution depending on the cluster version. If it has been called during +// backup planning, it will not be re-invoked during job execution. func getBackupDetailAndManifest( ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, initialDetails jobspb.BackupDetails, user security.SQLUsername, + backupDestination backupDestination, ) (jobspb.BackupDetails, BackupManifest, error) { makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI @@ -1678,31 +1721,13 @@ func getBackupDetailAndManifest( } } - // TODO(pbardea): Refactor (defaultURI and urisByLocalityKV) pairs into a - // backupDestination struct. - collectionURI, defaultURI, resolvedSubdir, urisByLocalityKV, prevs, err := - resolveDest(ctx, user, initialDetails.Destination, endTime, initialDetails.IncrementalFrom, execCfg) - if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err - } - - defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user) - if err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err - } - defer defaultStore.Close() - - if err := checkForPreviousBackup(ctx, defaultStore, defaultURI); err != nil { - return jobspb.BackupDetails{}, BackupManifest{}, err - } - kmsEnv := &backupKMSEnv{settings: execCfg.Settings, conf: &execCfg.ExternalIODirConfig} mem := execCfg.RootMemoryMonitor.MakeBoundAccount() defer mem.Close(ctx) prevBackups, encryptionOptions, memSize, err := fetchPreviousBackups(ctx, &mem, user, - makeCloudStorage, prevs, *initialDetails.EncryptionOptions, kmsEnv) + makeCloudStorage, backupDestination.prevBackupURIs, *initialDetails.EncryptionOptions, kmsEnv) if err != nil { return jobspb.BackupDetails{}, BackupManifest{}, err } @@ -1891,14 +1916,14 @@ func getBackupDetailAndManifest( } return jobspb.BackupDetails{ - Destination: jobspb.BackupDetails_Destination{Subdir: resolvedSubdir}, + Destination: jobspb.BackupDetails_Destination{Subdir: backupDestination.chosenSubdir}, StartTime: startTime, EndTime: endTime, - URI: defaultURI, - URIsByLocalityKV: urisByLocalityKV, + URI: backupDestination.defaultURI, + URIsByLocalityKV: backupDestination.urisByLocalityKV, EncryptionOptions: encryptionOptions, EncryptionInfo: encryptionInfo, - CollectionURI: collectionURI, + CollectionURI: backupDestination.collectionURI, }, backupManifest, nil } diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 69e07c9a9d0f..3999c524f409 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1604,6 +1604,9 @@ func TestBackupRestoreResume(t *testing.T) { } createAndWaitForJob( t, sqlDB, []descpb.ID{backupTableDesc.GetID()}, + // We set the URI and HasLockedLocation to prevent the job from + // running the logic that resolves the backup destination. Since this + // is a "fake" job record, that step is bound to fail. jobspb.BackupDetails{ EndTime: tc.Servers[0].Clock().Now(), URI: "nodelocal://0/backup" + "-" + item.testName, @@ -3495,10 +3498,10 @@ func TestBackupTenantsWithRevisionHistory(t *testing.T) { const msg = "can not backup tenants with revision history" - _, err = sqlDB.DB.ExecContext(ctx, `BACKUP TENANT 10 TO 'nodelocal://0/' WITH revision_history`) + _, err = sqlDB.DB.ExecContext(ctx, `BACKUP TENANT 10 TO 'nodelocal://0/foo' WITH revision_history`) require.Contains(t, fmt.Sprint(err), msg) - _, err = sqlDB.DB.ExecContext(ctx, `BACKUP TO 'nodelocal://0/' WITH revision_history`) + _, err = sqlDB.DB.ExecContext(ctx, `BACKUP TO 'nodelocal://0/bar' WITH revision_history`) require.Contains(t, fmt.Sprint(err), msg) } @@ -4034,28 +4037,28 @@ func TestTimestampMismatch(t *testing.T) { sqlDB.ExpectErr( t, "backups listed out of order", `BACKUP DATABASE data TO $1 INCREMENTAL FROM $2`, - localFoo, incrementalT1FromFull, + localFoo+"/missing-initial", incrementalT1FromFull, ) // Missing an intermediate incremental backup. sqlDB.ExpectErr( t, "backups listed out of order", `BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`, - localFoo, fullBackup, incrementalT2FromT1, + localFoo+"/missing-inc", fullBackup, incrementalT2FromT1, ) // Backups specified out of order. sqlDB.ExpectErr( t, "out of order", `BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`, - localFoo, incrementalT1FromFull, fullBackup, + localFoo+"/ooo", incrementalT1FromFull, fullBackup, ) // Missing data for one table in the most recent backup. sqlDB.ExpectErr( t, "previous backup does not contain table", `BACKUP DATABASE data TO $1 INCREMENTAL FROM $2, $3`, - localFoo, fullBackup, incrementalT3FromT1OneTable, + localFoo+"/missing-table-data", fullBackup, incrementalT3FromT1OneTable, ) }) @@ -5947,7 +5950,7 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) { // Kickoff an incremental backup, but pause it just after it writes its // protected timestamps. - runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before_flow'`) + runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'`) var jobID int runner.QueryRow(t, @@ -6415,7 +6418,7 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) { // Creating the protected timestamp record should fail because there are too // many spans. Ensure that we get the appropriate error. - _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`) + _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo/byte-limit'`) require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+30 > 1 bytes") // TODO(adityamaru): Remove in 22.2 once no records protect spans. @@ -6434,7 +6437,7 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) { // Creating the protected timestamp record should fail because there are too // many spans. Ensure that we get the appropriate error. - _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`) + _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo/spans-limit'`) require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans") }) } @@ -9270,7 +9273,7 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { return nil }) - _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo) + _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo+"/fail") testutils.IsError(err, "must be after replica GC threshold") _, err = conn.Exec(`ALTER TABLE foo SET (exclude_data_from_backup = true)`) @@ -9282,7 +9285,7 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) { return true, nil }) - _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo) + _, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo+"/succeed") require.NoError(t, err) } @@ -9356,7 +9359,7 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) { return true, nil }) - runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before_flow'`) + runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'`) if _, err := conn.Exec(`BACKUP DATABASE test INTO $1`, localFoo); !testutils.IsError(err, "pause") { t.Fatal(err) } @@ -9802,7 +9805,7 @@ func TestBackupTimestampedCheckpointsAreLexicographical(t *testing.T) { numCheckpoints := 5 for i := 0; i < numCheckpoints; i++ { - checkpoints = append(checkpoints, newTimestampedCheckpointFileName()) + checkpoints = append(checkpoints, newTimestampedFilename(backupManifestCheckpointName)) // Occasionally, we call newTimestampedCheckpointFileName() in succession // too fast and the timestamp is the same. So wait for a moment to // avoid that. diff --git a/pkg/ccl/backupccl/datadriven_test.go b/pkg/ccl/backupccl/datadriven_test.go index 4f0d7c72aca0..e6c755742fcc 100644 --- a/pkg/ccl/backupccl/datadriven_test.go +++ b/pkg/ccl/backupccl/datadriven_test.go @@ -263,6 +263,9 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // // Supported arguments: // +// + expect-error-regex=: expects the query to return an error with a string +// matching the provided regex +// // + expect-error-ignore: expects the query to return an error, but we will // ignore it. // @@ -277,9 +280,15 @@ func (d *datadrivenTestState) getSQLDB(t *testing.T, server string, user string) // // Supported arguments: // +// + resume=: resumes the job referenced by the tag, use in conjunction +// with wait-for-state. +// // + cancel=: cancels the job referenced by the tag and waits for it to // reach a CANCELED state. // +// + wait-for-state= tag=: wait for +// the job referenced by the tag to reach the specified state. +// // - "save-cluster-ts" tag= // Saves the `SELECT cluster_logical_timestamp()` with the tag. Can be used // in the future with intstructions such as `aost`. @@ -434,6 +443,17 @@ func TestDataDriven(t *testing.T) { return strings.Join(ret, "\n") } + // Check if we are expecting an error, and want to match it against a + // regex. + if d.HasArg("expect-error-regex") { + require.NotNilf(t, err, "expected error") + var expectErrorRegex string + d.ScanArgs(t, "expect-error-regex", &expectErrorRegex) + testutils.IsError(err, expectErrorRegex) + ret = append(ret, "regex matches error") + return strings.Join(ret, "\n") + } + // Check for other errors. if err != nil { if pqErr := (*pq.Error)(nil); errors.As(err, &pqErr) { @@ -610,6 +630,39 @@ func TestDataDriven(t *testing.T) { runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) runner.Exec(t, `CANCEL JOB $1`, jobID) jobutils.WaitForJobToCancel(t, runner, jobID) + } else if d.HasArg("resume") { + var resumeJobTag string + d.ScanArgs(t, "resume", &resumeJobTag) + var jobID jobspb.JobID + var ok bool + if jobID, ok = ds.jobTags[resumeJobTag]; !ok { + t.Fatalf("could not find job with tag %s", resumeJobTag) + } + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + runner.Exec(t, `RESUME JOB $1`, jobID) + } else if d.HasArg("wait-for-state") { + var tag string + d.ScanArgs(t, "tag", &tag) + var jobID jobspb.JobID + var ok bool + if jobID, ok = ds.jobTags[tag]; !ok { + t.Fatalf("could not find job with tag %s", tag) + } + runner := sqlutils.MakeSQLRunner(ds.getSQLDB(t, server, user)) + var state string + d.ScanArgs(t, "wait-for-state", &state) + switch state { + case "succeeded": + jobutils.WaitForJobToSucceed(t, runner, jobID) + case "cancelled": + jobutils.WaitForJobToCancel(t, runner, jobID) + case "paused": + jobutils.WaitForJobToPause(t, runner, jobID) + case "failed": + jobutils.WaitForJobToFail(t, runner, jobID) + default: + t.Fatalf("unknown state %s", state) + } } return "" diff --git a/pkg/ccl/backupccl/manifest_handling.go b/pkg/ccl/backupccl/manifest_handling.go index 2cec04ae8e06..acb0b72b53ae 100644 --- a/pkg/ccl/backupccl/manifest_handling.go +++ b/pkg/ccl/backupccl/manifest_handling.go @@ -19,6 +19,7 @@ import ( "path" "regexp" "sort" + "strconv" "strings" "github.com/cockroachdb/cockroach/pkg/base" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" descpb "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -38,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -70,6 +73,9 @@ const ( // backupEncryptionInfoFile is the file name used to store the serialized // EncryptionInfo proto while the backup is in progress. backupEncryptionInfoFile = "ENCRYPTION-INFO" + // backupLockFile is the prefix of the file name used by the backup job to + // lock the bucket from running concurrent backups to the same destination. + backupLockFilePrefix = "BACKUP-LOCK-" ) const ( @@ -1207,15 +1213,98 @@ func RedactURIForErrorMessage(uri string) string { return redactedURI } +// writeLockOnBackupLocation is responsible for writing a job ID suffixed +// `BACKUP-LOCK` file that will prevent concurrent backups from writing to the +// same location. +func writeLockOnBackupLocation( + ctx context.Context, + execCfg *sql.ExecutorConfig, + defaultURI string, + jobID jobspb.JobID, + user security.SQLUsername, +) error { + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user) + if err != nil { + return err + } + defer defaultStore.Close() + + // The lock file name consists of two parts `BACKUP-LOCK-. + // + // The jobID is used in `checkForPreviousBackups` to ensure that we do not + // read our own lock file on job resumption. + lockFileName := fmt.Sprintf("%s%s", backupLockFilePrefix, strconv.FormatInt(int64(jobID), 10)) + + return cloud.WriteFile(ctx, defaultStore, lockFileName, bytes.NewReader([]byte("lock"))) +} + +// checkForBackupLockFile returns true if it finds a `BACKUP-LOCK-` file +// at `defaultURI`. +func checkForBackupLockFile( + ctx context.Context, + execCfg *sql.ExecutorConfig, + defaultURI string, + jobID jobspb.JobID, + user security.SQLUsername, +) (bool, error) { + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user) + if err != nil { + return false, err + } + defer defaultStore.Close() + + // Check for the existence of a BACKUP-LOCK file written by our job + // corresponding to `jobID`. If present, we have already laid claim on the + // location and do not need to check further. + lockFileName := fmt.Sprintf("%s%s", backupLockFilePrefix, strconv.FormatInt(int64(jobID), 10)) + r, err := defaultStore.ReadFile(ctx, lockFileName) + if err == nil { + r.Close(ctx) + return true, nil + } else if errors.Is(err, cloud.ErrFileDoesNotExist) { + return false, nil + } + + return false, err +} + // checkForPreviousBackup ensures that the target location does not already -// contain a BACKUP or checkpoint, locking out accidental concurrent operations -// on that location. Note that the checkpoint file should be written as soon as -// the job actually starts. +// contain a previous or concurrently running backup. It does this by checking +// for the existence of one of: +// +// 1) BACKUP_MANIFEST: Written on completion of a backup. +// +// 2) BACKUP-LOCK: Written by the coordinator node to lay claim on a backup +// location. This file is suffixed with the ID of the backup job to prevent a +// node from reading its own lock file on job resumption. +// +// 3) BACKUP-CHECKPOINT: Prior to 22.1.1, nodes would use the BACKUP-CHECKPOINT +// to lay claim on a backup location. To account for a mixed-version cluster +// where an older coordinator node may be running a concurrent backup to the +// same location, we must continue to check for a BACKUP-CHECKPOINT file. +// +// NB: The node will continue to write a BACKUP-CHECKPOINT file later in its +// execution, but we do not have to worry about reading our own +// BACKUP-CHECKPOINT file (and locking ourselves out) since +// `checkForPreviousBackup` is invoked as the first step on job resumption, and +// is not called again. func checkForPreviousBackup( - ctx context.Context, exportStore cloud.ExternalStorage, defaultURI string, + ctx context.Context, + execCfg *sql.ExecutorConfig, + defaultURI string, + jobID jobspb.JobID, + user security.SQLUsername, ) error { + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, defaultURI, user) + if err != nil { + return err + } + defer defaultStore.Close() + redactedURI := RedactURIForErrorMessage(defaultURI) - r, err := exportStore.ReadFile(ctx, backupManifestName) + + // Check for the presence of a BACKUP_MANIFEST. + r, err := defaultStore.ReadFile(ctx, backupManifestName) if err == nil { r.Close(ctx) return pgerror.Newf(pgcode.FileAlreadyExists, @@ -1229,7 +1318,34 @@ func checkForPreviousBackup( redactedURI, backupManifestName) } - r, err = readLatestCheckpointFile(ctx, exportStore, backupManifestCheckpointName) + // Check for the presence of a BACKUP-LOCK file with a job ID different from + // that of our job. + if err := defaultStore.List(ctx, "", "", func(s string) error { + s = strings.TrimPrefix(s, "/") + if strings.HasPrefix(s, backupLockFilePrefix) { + jobIDSuffix := strings.TrimPrefix(s, backupLockFilePrefix) + if len(jobIDSuffix) == 0 { + return errors.AssertionFailedf("malformed BACKUP-LOCK file %s, expected a job ID suffix", s) + } + if jobIDSuffix != strconv.FormatInt(int64(jobID), 10) { + return pgerror.Newf(pgcode.FileAlreadyExists, + "%s already contains a `BACKUP-LOCK` file written by job %s", + redactedURI, jobIDSuffix) + } + } + return nil + }); err != nil { + // HTTP external storage does not support listing, and so we skip checking + // for a BACKUP-LOCK file. + if !errors.Is(err, cloud.ErrListingUnsupported) { + return errors.Wrap(err, "checking for BACKUP-LOCK file") + } + log.Warningf(ctx, "external storage %s does not support listing: skip checking for BACKUP_LOCK", redactedURI) + } + + // Check for a BACKUP-CHECKPOINT that might have been written by a node + // running a pre-22.1.1 binary. + r, err = readLatestCheckpointFile(ctx, defaultStore, backupManifestCheckpointName) if err == nil { r.Close(ctx) return pgerror.Newf(pgcode.FileAlreadyExists, @@ -1329,17 +1445,15 @@ func readLatestCheckpointFile( return nil, errors.Wrapf(err, "%s could not be read in the base or progress directory", filename) } return r, nil - } -// newTimestampedCheckpointFileName returns a string of a new checkpoint filename -// with a suffixed version. It returns it in the format of BACKUP-CHECKPOINT- -// where version is a hex encoded one's complement of the timestamp. -// This means that as long as the supplied timestamp is correct, the filenames -// will adhere to a lexicographical/utf-8 ordering such that the most -// recent file is at the top. -func newTimestampedCheckpointFileName() string { +// newTimestampedFilename returns a filename with a suffixed version. This new +// filename is in the format of filename- where version is a hex +// encoded one's complement of the timestamp. This means that as long as the +// supplied timestamp is correct, the filenames will adhere to a +// lexicographical/utf-8 ordering such that the most recent file is at the top. +func newTimestampedFilename(filename string) string { var buffer []byte buffer = encoding.EncodeStringDescending(buffer, timeutil.Now().String()) - return fmt.Sprintf("%s-%s", backupManifestCheckpointName, hex.EncodeToString(buffer)) + return fmt.Sprintf("%s-%s", filename, hex.EncodeToString(buffer)) } diff --git a/pkg/ccl/backupccl/testdata/backup-restore/lock-concurrent-backups b/pkg/ccl/backupccl/testdata/backup-restore/lock-concurrent-backups new file mode 100644 index 000000000000..2aa38aeb8d45 --- /dev/null +++ b/pkg/ccl/backupccl/testdata/backup-restore/lock-concurrent-backups @@ -0,0 +1,137 @@ +new-server name=s1 +---- + +# Test that a backup job does not read its own lock file on resumption, +# effectively locking itself out. We pause the job after it has written its +# BACKUP-LOCK file and then resume it to ensure we don't read our own write. +subtest backup-does-not-read-its-own-lock + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.write_lock'; +---- + +backup expect-pausepoint tag=a +BACKUP INTO 'userfile://defaultdb.public.foo/foo'; +---- +job paused at pausepoint + +# The job should have written a `BACKUP-LOCK` file suffixed with a job ID and a +# timestamp. +query-sql +SELECT regexp_replace(filename, '.*BACKUP-LOCK-[0-9]+$', 'BACKUP-LOCK') FROM defaultdb.public.foo_upload_files; +---- +BACKUP-LOCK + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + +# Resume the job and expect it to succeed. +job resume=a +---- + +job tag=a wait-for-state=succeeded +---- + +subtest end + + +# Test that a backup job on resume will not rewrite the `BACKUP-LOCK` file if it +# already sees one, thus maintaining write-once semantics. +subtest backup-lock-is-write-once + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.write_first_checkpoint'; +---- + +backup expect-pausepoint tag=b +BACKUP INTO 'userfile://defaultdb.public.bar/bar'; +---- +job paused at pausepoint + +# The job should have written a `BACKUP-LOCK` file suffixed with a job ID and a +# timestamp. +query-sql +SELECT regexp_replace(filename, '.*BACKUP-LOCK-[0-9]+$', 'BACKUP-LOCK') FROM defaultdb.public.bar_upload_files; +---- +BACKUP-LOCK + +# Resume the job and expect it to pause again after writing `BACKUP-LOCK` again. +job resume=b +---- + +job tag=b wait-for-state=paused +---- + +# We expect to see only one lock file since the resumed job would see the +# previously written one. +query-sql +SELECT regexp_replace(filename, '.*BACKUP-LOCK-[0-9]+$', 'BACKUP-LOCK') FROM defaultdb.public.bar_upload_files; +---- +BACKUP-LOCK + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + +# Resume the job and expect it to succeed. +job resume=b +---- + +job tag=b wait-for-state=succeeded +---- + +subtest end + +# Note, `BACKUP TO` is going away, and `BACKUP INTO` picks a timestamped +# directory making it *impossible* for two backups to write to the same +# directory in the future. +# +# Backup should fail if it sees a BACKUP_LOCK in the bucket. +subtest backup-lock-file-prevents-concurrent-backups + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'; +---- + +backup expect-pausepoint +BACKUP TO 'userfile://defaultdb.public.baz/baz'; +---- +job paused at pausepoint + +exec-sql expect-error-regex='userfile://defaultdb.public.baz/baz already contains a `BACKUP-LOCK`' +BACKUP TO 'userfile://defaultdb.public.baz/baz'; +---- +regex matches error + +subtest end + +# For mixed version compatability the backup job also checks for a +# `BACKUP-CHECKPOINT` file when ensuring that there are no concurrent backups +# writing to the same bucket. +# +# This test ensures that a backup job does not check for a `BACKUP-CHECKPOINT` +# lock file after writing its own `BACKUP-CHECKPOINT`. +subtest backup-does-not-read-its-own-checkpoint + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.after.write_first_checkpoint'; +---- + +backup expect-pausepoint tag=d +BACKUP TO 'userfile://defaultdb.public.bat/bat'; +---- +job paused at pausepoint + +exec-sql +SET CLUSTER SETTING jobs.debug.pausepoints = ''; +---- + +# Resume the job and expect it to succeed. +job resume=d +---- + +job tag=d wait-for-state=succeeded +---- + +subtest end diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index 01f14cb4aa85..0a77b13eb6d1 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -207,7 +207,7 @@ func waitForJobToHaveStatus( expectedStatus jobs.Status, nodesWithAdoptionDisabled option.NodeListOption, ) { - if err := retry.ForDuration(time.Minute*2, func() error { + if err := retry.ForDuration(time.Minute*1, func() error { // TODO(adityamaru): This is unfortunate and can be deleted once // https://github.com/cockroachdb/cockroach/pull/79666 is backported to // 21.2 and the mixed version map for roachtests is bumped to the 21.2 @@ -572,6 +572,84 @@ func registerBackupMixedVersion(r registry.Registry) { u.run(ctx, t) }, }) + + // TODO(adityamaru): Delete in 22.2 since nodes will no longer rely on + // BACKUP-CHECKPOINT to lock their location. + r.Add(registry.TestSpec{ + Name: "backup/mixed-version-concurrent-backups", + Owner: registry.OwnerBulkIO, + Cluster: r.MakeClusterSpec(4), + EncryptionSupport: registry.EncryptionMetamorphic, + RequiresLicense: true, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + // An empty string means that the cockroach binary specified by flag + // `cockroach` will be used. + const mainVersion = "" + roachNodes := c.All() + upgradedNodes := c.Nodes(1, 2) + oldNodes := c.Nodes(3, 4) + predV, err := PredecessorVersion(*t.BuildVersion()) + require.NoError(t, err) + c.Put(ctx, t.DeprecatedWorkload(), "./workload") + + u := newVersionUpgradeTest(c, + uploadAndStartFromCheckpointFixture(roachNodes, predV), + waitForUpgradeStep(roachNodes), + preventAutoUpgradeStep(1), + setShortJobIntervalsStep(1), + loadBackupDataStep(c), + // Upgrade a node. + binaryUpgradeStep(upgradedNodes, mainVersion), + disableJobAdoptionStep(c, oldNodes), + + // Plan and run a backup on an upgraded node. + // + // Since the cluster is in a mixed-version state, the backup will lock + // its bucket, and perform destination resolution during planning. + func(ctx context.Context, t test.Test, u *versionUpgradeTest) { + t.L().Printf("running and pausing BACKUP on upgraded node") + gatewayDB := c.Conn(ctx, t.L(), upgradedNodes[0]) + defer gatewayDB.Close() + _, err := gatewayDB.Exec(`SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before.flow'`) + require.NoError(t, err) + var jobID jobspb.JobID + err = gatewayDB.QueryRow(`BACKUP TABLE bank.bank TO 'userfile:///upgraded-node-checkpoint' WITH detached `).Scan(&jobID) + require.NoError(t, err) + waitForJobToHaveStatus(ctx, t, gatewayDB, jobID, jobs.StatusPaused, oldNodes) + _, err = gatewayDB.Exec(`SET CLUSTER SETTING jobs.debug.pausepoints = ''`) + require.NoError(t, err) + }, + + // Now, run a backup to the same location but from a 21.2 node. This + // backup should fail because it sees a BACKUP-CHECKPOINT written by the + // previous backup job. + enableJobAdoptionStep(c, oldNodes), + func(ctx context.Context, t test.Test, u *versionUpgradeTest) { + t.L().Printf("running BACKUP to the same location on old node; waiting for failure") + gatewayDB := c.Conn(ctx, t.L(), oldNodes[0]) + defer gatewayDB.Close() + var jobID jobspb.JobID + err := gatewayDB.QueryRow(`BACKUP TABLE bank.bank TO 'userfile:///upgraded-node-checkpoint' WITH detached `).Scan(&jobID) + testutils.IsError(err, "userfile:///upgraded-node-checkpoint already contains a BACKUP-CHECKPOINT file (is another operation already in progress?)") + }, + disableJobAdoptionStep(c, oldNodes), + // Resume the paused backup job and get an upgraded node to complete it. + func(ctx context.Context, t test.Test, u *versionUpgradeTest) { + t.L().Printf("resuming BACKUP on upgraded node; waiting for success") + gatewayDB := c.Conn(ctx, t.L(), upgradedNodes[0]) + defer gatewayDB.Close() + var jobID jobspb.JobID + err = gatewayDB.QueryRow(`SELECT job_id FROM [SHOW JOBS] WHERE status = 'paused' AND job_type = 'BACKUP'`).Scan(&jobID) + require.NoError(t, err) + _, err = gatewayDB.ExecContext(ctx, `RESUME JOB $1`, jobID) + require.NoError(t, err) + waitForJobToHaveStatus(ctx, t, gatewayDB, jobID, jobs.StatusSucceeded, oldNodes) + }, + ) + + u.run(ctx, t) + }, + }) } // initBulkJobPerfArtifacts registers a histogram, creates a performance diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index 8f9a2b84e361..c67a94253132 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -50,6 +50,12 @@ func WaitForJobToCancel(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID waitForJobToHaveStatus(t, db, jobID, jobs.StatusCanceled) } +// WaitForJobToFail waits for the specified job ID to be in a failed state. +func WaitForJobToFail(t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID) { + t.Helper() + waitForJobToHaveStatus(t, db, jobID, jobs.StatusFailed) +} + func waitForJobToHaveStatus( t testing.TB, db *sqlutils.SQLRunner, jobID jobspb.JobID, expectedStatus jobs.Status, ) {