diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index cf56756e86bd..e9bac995d8d5 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -57,18 +58,11 @@ import ( // BackupCheckpointInterval is the interval at which backup progress is saved // to durable storage. -var BackupCheckpointInterval = time.Minute - -// TestingShortBackupCheckpointInterval sets the BackupCheckpointInterval -// to a shorter interval for testing purposes, so we can see multiple -// checkpoints written without having extremely large backups. It returns -// a function which resets the checkpoint interval to the old interval. -func TestingShortBackupCheckpointInterval(oldInterval time.Duration) func() { - BackupCheckpointInterval = time.Millisecond * 10 - return func() { - BackupCheckpointInterval = oldInterval - } -} +var BackupCheckpointInterval = settings.RegisterDurationSetting( + settings.TenantWritable, + "bulkio.backup.checkpoint_interval", + "the minimum time between writing progress checkpoints during a backup", + time.Minute) var forceReadBackupManifest = util.ConstantWithMetamorphicTestBool("backup-read-manifest", false) @@ -240,22 +234,22 @@ func backup( for i := int32(0); i < progDetails.CompletedSpans; i++ { requestFinishedCh <- struct{}{} } - if timeutil.Since(lastCheckpoint) > BackupCheckpointInterval { + + interval := BackupCheckpointInterval.Get(&execCtx.ExecCfg().Settings.SV) + if timeutil.Since(lastCheckpoint) > interval { resumerSpan.RecordStructured(&backuppb.BackupProgressTraceEvent{ TotalNumFiles: numBackedUpFiles, TotalEntryCounts: backupManifest.EntryCounts, RevisionStartTime: backupManifest.RevisionStartTime, }) - lastCheckpoint = timeutil.Now() - err := writeBackupManifestCheckpoint( ctx, defaultURI, encryption, backupManifest, execCtx.ExecCfg(), execCtx.User(), ) if err != nil { log.Errorf(ctx, "unable to checkpoint backup descriptor: %+v", err) } - + lastCheckpoint = timeutil.Now() if execCtx.ExecCfg().TestingKnobs.AfterBackupCheckpoint != nil { execCtx.ExecCfg().TestingKnobs.AfterBackupCheckpoint() } diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 9067b239095c..ff6bc7947b21 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -799,6 +799,10 @@ func writeBackupManifestCheckpoint( execCfg *sql.ExecutorConfig, user username.SQLUsername, ) error { + var span *tracing.Span + ctx, span = tracing.ChildSpan(ctx, "write-backup-manifest-checkpoint") + defer span.Finish() + defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, storageURI, user) if err != nil { return err diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index bb2e028b66cb..661d8b65099d 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1472,62 +1472,6 @@ func TestBackupRestoreSystemJobsProgress(t *testing.T) { checkInProgressBackupRestore(t, checkFraction, checkFraction) } -func TestBackupRestoreCheckpointing(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.WithIssue(t, 33357) - - defer func(oldInterval time.Duration) { - BackupCheckpointInterval = oldInterval - }(BackupCheckpointInterval) - BackupCheckpointInterval = 0 - - var checkpointPath string - - checkBackup := func(ctx context.Context, ip inProgressState) error { - checkpointPath = filepath.Join(ip.dir, ip.name, backupProgressDirectory+"/"+backupManifestCheckpointName) - checkpointDescBytes, err := ioutil.ReadFile(checkpointPath) - if err != nil { - return errors.Wrap(err, "error while reading checkpoint") - } - var checkpointDesc backuppb.BackupManifest - if err := protoutil.Unmarshal(checkpointDescBytes, &checkpointDesc); err != nil { - return errors.Wrap(err, "error while unmarshalling checkpoint") - } - if len(checkpointDesc.Files) == 0 { - return errors.Errorf("empty backup checkpoint descriptor") - } - return nil - } - - checkRestore := func(ctx context.Context, ip inProgressState) error { - jobID, err := ip.latestJobID() - if err != nil { - return err - } - highWaterMark, err := getHighWaterMark(jobID, ip.DB) - if err != nil { - return err - } - low := keys.SystemSQLCodec.TablePrefix(ip.backupTableID) - high := keys.SystemSQLCodec.TablePrefix(ip.backupTableID + 1) - if bytes.Compare(highWaterMark, low) <= 0 || bytes.Compare(highWaterMark, high) >= 0 { - return errors.Errorf("expected high-water mark %v to be between %v and %v", - highWaterMark, low, high) - } - return nil - } - - checkInProgressBackupRestore(t, checkBackup, checkRestore) - - if _, err := os.Stat(checkpointPath); err == nil { - t.Fatalf("backup checkpoint descriptor at %s not cleaned up", checkpointPath) - } else if !oserror.IsNotExist(err) { - t.Fatal(err) - } -} - func createAndWaitForJob( t *testing.T, db *sqlutils.SQLRunner, @@ -1702,25 +1646,6 @@ func TestBackupRestoreResume(t *testing.T) { }) } -func getHighWaterMark(jobID jobspb.JobID, sqlDB *gosql.DB) (roachpb.Key, error) { - var progressBytes []byte - if err := sqlDB.QueryRow( - `SELECT progress FROM system.jobs WHERE id = $1`, jobID, - ).Scan(&progressBytes); err != nil { - return nil, err - } - var payload jobspb.Progress - if err := protoutil.Unmarshal(progressBytes, &payload); err != nil { - return nil, err - } - switch d := payload.Details.(type) { - case *jobspb.Progress_Restore: - return d.Restore.HighWater, nil - default: - return nil, errors.Errorf("unexpected job details type %T", d) - } -} - // TestBackupRestoreControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB // work as intended on backup and restore jobs. func TestBackupRestoreControlJob(t *testing.T) { @@ -9815,11 +9740,6 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // The regular interval is a minute which would require us to take a - // very large backup in order to get more than one checkpoint. Instead, - // lower the interval and change it back to normal after the test. - resetCheckpointInterval := TestingShortBackupCheckpointInterval(BackupCheckpointInterval) - defer resetCheckpointInterval() var numCheckpointsWritten int // Set the testing knob so we count each time we write a checkpoint. @@ -9840,6 +9760,10 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) { tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params) defer cleanupFn() + // The regular interval is a minute which would require us to take a + // very large backup in order to get more than one checkpoint. + sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.checkpoint_interval = '10ms'") + query := fmt.Sprintf("BACKUP INTO %s", userfile) sqlDB.Exec(t, query)