Skip to content

Commit

Permalink
ttl: use JobRegistry.UpdateJobWithTxn to avoid race condition
Browse files Browse the repository at this point in the history
refs #85800

UpdateJobWithTxn updates the job progress with useReadLock=true to prevent
concurrent update attempts from undoing each other.

Release justification: Fix test flake.

Release note: None
  • Loading branch information
ecwall committed Aug 17, 2022
1 parent 2aeed01 commit 83776cb
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions pkg/sql/ttl/ttljob/ttljob_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (ttl *ttlProcessor) work(ctx context.Context) error {
deleteRateLimit,
)

rowCount := int64(0)
processorRowCount := int64(0)

var relationName string
var pkColumns []string
Expand Down Expand Up @@ -151,7 +152,7 @@ func (ttl *ttlProcessor) work(ctx context.Context) error {
ttlSpec.PreSelectStatement,
)
// add before returning err in case of partial success
atomic.AddInt64(&rowCount, rangeRowCount)
atomic.AddInt64(&processorRowCount, rangeRowCount)
metrics.RangeTotalDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
// Continue until channel is fully read.
Expand Down Expand Up @@ -224,18 +225,26 @@ func (ttl *ttlProcessor) work(ctx context.Context) error {
return err
}

job, err := serverCfg.JobRegistry.LoadJob(ctx, ttlSpec.JobID)
if err != nil {
return err
}
return db.Txn(ctx, func(_ context.Context, txn *kv.Txn) error {
return job.Update(ctx, txn, func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
jobID := ttlSpec.JobID
return serverCfg.JobRegistry.UpdateJobWithTxn(
ctx,
jobID,
nil, /* txn */
true, /* useReadLock */
func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount += rowCount
existingRowCount := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount
progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount += processorRowCount
ju.UpdateProgress(progress)
log.VInfof(
ctx,
2, /* level */
"TTL processorRowCount updated jobID=%d processorID=%d tableID=%d existingRowCount=%d processorRowCount=%d progress=%s",
jobID, ttl.ProcessorID, details.TableID, existingRowCount, processorRowCount, progress,
)
return nil
})
})
},
)
}

// rangeRowCount should be checked even if the function returns an error because it may have partially succeeded
Expand Down Expand Up @@ -348,13 +357,13 @@ func runTTLOnRange(
defer tokens.Consume()

start := timeutil.Now()
rowCount, err := deleteBuilder.run(ctx, ie, txn, deleteBatch)
batchRowCount, err := deleteBuilder.run(ctx, ie, txn, deleteBatch)
if err != nil {
return err
}

metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
rangeRowCount += int64(rowCount)
rangeRowCount += int64(batchRowCount)
return nil
}); err != nil {
return rangeRowCount, errors.Wrapf(err, "error during row deletion")
Expand Down

0 comments on commit 83776cb

Please sign in to comment.