Skip to content

Commit

Permalink
Merge pull request #141461 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-25.1.0-rc-141420

release-25.1.0-rc: upgrades: lock rows before backfilling them during upgrade
  • Loading branch information
dt authored Feb 14, 2025
2 parents 76c2bc9 + 1c61f6e commit e5ec46c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 65 deletions.
28 changes: 14 additions & 14 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ WHERE id = $1
}
}

// Insert the job payload and progress into the system.jobs_info table.
infoStorage := j.InfoStorage(u.txn)
infoStorage.claimChecked = true
if payloadBytes != nil {
if err := infoStorage.WriteLegacyPayload(ctx, payloadBytes); err != nil {
return err
}
}
if progressBytes != nil {
if err := infoStorage.WriteLegacyProgress(ctx, progressBytes); err != nil {
return err
}
}

v, err := u.txn.GetSystemSchemaVersion(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -360,20 +374,6 @@ WHERE id = $1
}
}

// Insert the job payload and progress into the system.jobs_info table.
infoStorage := j.InfoStorage(u.txn)
infoStorage.claimChecked = true
if payloadBytes != nil {
if err := infoStorage.WriteLegacyPayload(ctx, payloadBytes); err != nil {
return err
}
}
if progressBytes != nil {
if err := infoStorage.WriteLegacyProgress(ctx, progressBytes); err != nil {
return err
}
}

return nil
}

Expand Down
112 changes: 61 additions & 51 deletions pkg/upgrade/upgrades/v25_1_add_jobs_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package upgrades

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -18,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// addJobsTables adds the job_progress, job_progress_history, job_status and
Expand Down Expand Up @@ -92,60 +92,70 @@ var jobsBackfillPageSize = 32
func backfillJobsTablesAndColumns(
ctx context.Context, cv clusterversion.ClusterVersion, d upgrade.TenantDeps,
) error {
every := log.Every(time.Minute)
log.Infof(ctx, "backfilling new jobs tables and columns")
jobsBackfilled := 0
for {
var done int
if err := d.DB.DescsTxn(ctx, func(ctx context.Context, tx descs.Txn) (retErr error) {
// Find rows that have not been backfilled, which we will detect via a
// NULL owner. The coalesce() here isn't necessary since the vtable will
// produce empty strings rather than nulls if a job somehow is missing an
// owner, but since the owner not being null is what breaks the loop the
// extra coalesce here just ensures wouldn't loop forever if that were to
// change out from under us.
q, err := tx.QueryIteratorEx(ctx, "jobs-backfill-read", tx.KV(), sessiondata.NodeUserSessionDataOverride,
`SELECT
v.job_id,
v.description,
coalesce(v.user_name, ''),
v.finished,
v.error,
v.running_status,
v.fraction_completed,
v.high_water_timestamp
FROM crdb_internal.jobs v
LEFT JOIN system.jobs j ON j.id = v.job_id
WHERE j.owner IS NULL
LIMIT $1`, jobsBackfillPageSize,
)
if err != nil {
return err
}
defer func() {
retErr = errors.CombineErrors(retErr, q.Close())
}()

done = 0
for {
ok, err := q.Next(ctx)
candidateRows, err := d.DB.Executor().QueryBufferedEx(ctx, "jobs-backfill-find", nil, sessiondata.NodeUserSessionDataOverride,
`SELECT id FROM system.jobs WHERE owner IS NULL LIMIT $1`, jobsBackfillPageSize)
if err != nil {
return err
}
if len(candidateRows) == 0 {
// All done!
break
}
for _, candidate := range candidateRows {
id := int(tree.MustBeDInt(candidate[0]))
var backfilled bool
if err := d.DB.DescsTxn(ctx, func(ctx context.Context, tx descs.Txn) (retErr error) {
backfilled = false
// re-read the job in the txn, acquiring locks this time.
found, err := tx.QueryRowEx(ctx, "jobs-backfill-lock-job", tx.KV(), sessiondata.NodeUserSessionDataOverride,
`SELECT id FROM system.jobs WHERE id = $1 AND owner IS NULL FOR UPDATE`, id)
if err != nil {
return err
}
if !ok {
break
// The job isn't here anymore and needing backfill; move on.
if found == nil {
return nil
}

row := q.Cur()
backfilled = true

// Lock the job infos to prevent concurrent modifications there as well.
_, err = tx.QueryBufferedEx(ctx, "jobs-backfill-lock-info", tx.KV(), sessiondata.NodeUserSessionDataOverride,
`SELECT job_id FROM system.job_info WHERE job_id = $1 FOR UPDATE`, id)
if err != nil {
return err
}

// Materialize the job details for the (now locked) job row.
row, err := tx.QueryRowEx(ctx, "jobs-backfill-read", tx.KV(), sessiondata.NodeUserSessionDataOverride,
`SELECT
job_id,
description,
user_name,
finished,
error,
running_status,
fraction_completed,
high_water_timestamp
FROM crdb_internal.jobs
WHERE job_id = $1`, id,
)
if err != nil {
return err
}
// Update the job row.
if _, err := tx.ExecEx(ctx, "jobs-backfill-jobs", tx.KV(),
sessiondata.NodeUserSessionDataOverride,
`UPDATE system.jobs
SET description = $1,
owner = $2,
finished = $3,
error_msg = NULLIF($4, '')
WHERE id = $5`, row[1], row[2], row[3], row[4], row[0],
SET description = $1,
owner = $2,
finished = $3,
error_msg = NULLIF($4, '')
WHERE id = $5`, row[1], row[2], row[3], row[4], row[0],
); err != nil {
return err
}
Expand Down Expand Up @@ -174,16 +184,16 @@ func backfillJobsTablesAndColumns(
return err
}
}
done++
return nil
}); err != nil {
return err
}
if backfilled {
jobsBackfilled++
}
if every.ShouldLog() {
log.Infof(ctx, "backfilled new columns for %d jobs so far", jobsBackfilled)
}
return nil
}); err != nil {
return err
}

jobsBackfilled += done
if done == 0 {
break
}
}
log.Infof(ctx, "finished backfilling new jobs tables and columns for %d jobs", jobsBackfilled)
Expand Down

0 comments on commit e5ec46c

Please sign in to comment.