Skip to content

Commit

Permalink
backupccl: fix data race with admission pacer
Browse files Browse the repository at this point in the history
We now use one pacer per fileSSTSink.

Fixes #121199.
Fixes #121202.
Fixes #121201.
Fixes #121200.
Fixes #121198.
Fixes #121197.
Fixes #121196.
Fixes #121195.
Fixes #121194.
Fixes #121193.
Fixes #121192.
Fixes #121191.
Fixes #121190.
Fixes #121189.
Fixes #121188.
Fixes #121187.

Release note: None
  • Loading branch information
aadityasondhi committed Mar 27, 2024
1 parent 502fa9b commit 1a26361
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,29 +462,29 @@ func runBackupProcessor(
if len(chunk) > 0 {
todo <- chunk
}
return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error {
readTime := spec.BackupEndTime.GoTime()

// Passing a nil pacer is effectively a noop if CPU control is disabled.
var pacer *admission.Pacer = nil
if fileSSTSinkElasticCPUControlEnabled.Get(&clusterSettings.SV) {
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
if !ok {
tenantID = roachpb.SystemTenantID
// Passing a nil pacer is effectively a noop if CPU control is disabled.
var pacer *admission.Pacer = nil
if fileSSTSinkElasticCPUControlEnabled.Get(&clusterSettings.SV) {
tenantID, ok := roachpb.ClientTenantFromContext(ctx)
if !ok {
tenantID = roachpb.SystemTenantID
}
pacer = flowCtx.Cfg.AdmissionPacerFactory.NewPacer(
100*time.Millisecond,
admission.WorkInfo{
TenantID: tenantID,
Priority: admissionpb.BulkNormalPri,
CreateTime: timeutil.Now().UnixNano(),
BypassAdmission: false,
},
)
}
pacer = flowCtx.Cfg.AdmissionPacerFactory.NewPacer(
100*time.Millisecond,
admission.WorkInfo{
TenantID: tenantID,
Priority: admissionpb.BulkNormalPri,
CreateTime: timeutil.Now().UnixNano(),
BypassAdmission: false,
},
)
}
// It is safe to close a nil pacer.
defer pacer.Close()
// It is safe to close a nil pacer.
defer pacer.Close()

return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error {
readTime := spec.BackupEndTime.GoTime()
sink := makeFileSSTSink(sinkConf, storage, pacer)
defer func() {
if err := sink.flush(ctx); err != nil {
Expand Down

0 comments on commit 1a26361

Please sign in to comment.