Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: make checkpoint interval configurable #83151

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -57,18 +58,11 @@ import (

// BackupCheckpointInterval is the interval at which backup progress is saved
// to durable storage.
var BackupCheckpointInterval = time.Minute

// TestingShortBackupCheckpointInterval sets the BackupCheckpointInterval
// to a shorter interval for testing purposes, so we can see multiple
// checkpoints written without having extremely large backups. It returns
// a function which resets the checkpoint interval to the old interval.
func TestingShortBackupCheckpointInterval(oldInterval time.Duration) func() {
BackupCheckpointInterval = time.Millisecond * 10
return func() {
BackupCheckpointInterval = oldInterval
}
}
var BackupCheckpointInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
"bulkio.backup.checkpoint_interval",
"the minimum time between writing progress checkpoints during a backup",
time.Minute)

var forceReadBackupManifest = util.ConstantWithMetamorphicTestBool("backup-read-manifest", false)

Expand Down Expand Up @@ -240,22 +234,22 @@ func backup(
for i := int32(0); i < progDetails.CompletedSpans; i++ {
requestFinishedCh <- struct{}{}
}
if timeutil.Since(lastCheckpoint) > BackupCheckpointInterval {

interval := BackupCheckpointInterval.Get(&execCtx.ExecCfg().Settings.SV)
if timeutil.Since(lastCheckpoint) > interval {
resumerSpan.RecordStructured(&backuppb.BackupProgressTraceEvent{
TotalNumFiles: numBackedUpFiles,
TotalEntryCounts: backupManifest.EntryCounts,
RevisionStartTime: backupManifest.RevisionStartTime,
})

lastCheckpoint = timeutil.Now()

err := writeBackupManifestCheckpoint(
ctx, defaultURI, encryption, backupManifest, execCtx.ExecCfg(), execCtx.User(),
)
if err != nil {
log.Errorf(ctx, "unable to checkpoint backup descriptor: %+v", err)
}

lastCheckpoint = timeutil.Now()
if execCtx.ExecCfg().TestingKnobs.AfterBackupCheckpoint != nil {
execCtx.ExecCfg().TestingKnobs.AfterBackupCheckpoint()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ func writeBackupManifestCheckpoint(
execCfg *sql.ExecutorConfig,
user username.SQLUsername,
) error {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "write-backup-manifest-checkpoint")
defer span.Finish()

defaultStore, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, storageURI, user)
if err != nil {
return err
Expand Down
84 changes: 4 additions & 80 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,62 +1472,6 @@ func TestBackupRestoreSystemJobsProgress(t *testing.T) {
checkInProgressBackupRestore(t, checkFraction, checkFraction)
}

func TestBackupRestoreCheckpointing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 33357)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test has been skipped since 2019 and this issue has been closed.


defer func(oldInterval time.Duration) {
BackupCheckpointInterval = oldInterval
}(BackupCheckpointInterval)
BackupCheckpointInterval = 0

var checkpointPath string

checkBackup := func(ctx context.Context, ip inProgressState) error {
checkpointPath = filepath.Join(ip.dir, ip.name, backupProgressDirectory+"/"+backupManifestCheckpointName)
checkpointDescBytes, err := ioutil.ReadFile(checkpointPath)
if err != nil {
return errors.Wrap(err, "error while reading checkpoint")
}
var checkpointDesc backuppb.BackupManifest
if err := protoutil.Unmarshal(checkpointDescBytes, &checkpointDesc); err != nil {
return errors.Wrap(err, "error while unmarshalling checkpoint")
}
if len(checkpointDesc.Files) == 0 {
return errors.Errorf("empty backup checkpoint descriptor")
}
return nil
}

checkRestore := func(ctx context.Context, ip inProgressState) error {
jobID, err := ip.latestJobID()
if err != nil {
return err
}
highWaterMark, err := getHighWaterMark(jobID, ip.DB)
if err != nil {
return err
}
low := keys.SystemSQLCodec.TablePrefix(ip.backupTableID)
high := keys.SystemSQLCodec.TablePrefix(ip.backupTableID + 1)
if bytes.Compare(highWaterMark, low) <= 0 || bytes.Compare(highWaterMark, high) >= 0 {
return errors.Errorf("expected high-water mark %v to be between %v and %v",
highWaterMark, low, high)
}
return nil
}

checkInProgressBackupRestore(t, checkBackup, checkRestore)

if _, err := os.Stat(checkpointPath); err == nil {
t.Fatalf("backup checkpoint descriptor at %s not cleaned up", checkpointPath)
} else if !oserror.IsNotExist(err) {
t.Fatal(err)
}
}

func createAndWaitForJob(
t *testing.T,
db *sqlutils.SQLRunner,
Expand Down Expand Up @@ -1702,25 +1646,6 @@ func TestBackupRestoreResume(t *testing.T) {
})
}

func getHighWaterMark(jobID jobspb.JobID, sqlDB *gosql.DB) (roachpb.Key, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM system.jobs WHERE id = $1`, jobID,
).Scan(&progressBytes); err != nil {
return nil, err
}
var payload jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &payload); err != nil {
return nil, err
}
switch d := payload.Details.(type) {
case *jobspb.Progress_Restore:
return d.Restore.HighWater, nil
default:
return nil, errors.Errorf("unexpected job details type %T", d)
}
}

// TestBackupRestoreControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB
// work as intended on backup and restore jobs.
func TestBackupRestoreControlJob(t *testing.T) {
Expand Down Expand Up @@ -9815,11 +9740,6 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// The regular interval is a minute which would require us to take a
// very large backup in order to get more than one checkpoint. Instead,
// lower the interval and change it back to normal after the test.
resetCheckpointInterval := TestingShortBackupCheckpointInterval(BackupCheckpointInterval)
defer resetCheckpointInterval()
var numCheckpointsWritten int

// Set the testing knob so we count each time we write a checkpoint.
Expand All @@ -9840,6 +9760,10 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) {
tc, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
defer cleanupFn()

// The regular interval is a minute which would require us to take a
// very large backup in order to get more than one checkpoint.
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.backup.checkpoint_interval = '10ms'")

query := fmt.Sprintf("BACKUP INTO %s", userfile)
sqlDB.Exec(t, query)

Expand Down