diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index ed5352bf711d..15a67c000599 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -267,6 +267,20 @@ WHERE id = $1 } } + // Insert the job payload and progress into the system.jobs_info table. + infoStorage := j.InfoStorage(u.txn) + infoStorage.claimChecked = true + if payloadBytes != nil { + if err := infoStorage.WriteLegacyPayload(ctx, payloadBytes); err != nil { + return err + } + } + if progressBytes != nil { + if err := infoStorage.WriteLegacyProgress(ctx, progressBytes); err != nil { + return err + } + } + v, err := u.txn.GetSystemSchemaVersion(ctx) if err != nil { return err @@ -360,20 +374,6 @@ WHERE id = $1 } } - // Insert the job payload and progress into the system.jobs_info table. - infoStorage := j.InfoStorage(u.txn) - infoStorage.claimChecked = true - if payloadBytes != nil { - if err := infoStorage.WriteLegacyPayload(ctx, payloadBytes); err != nil { - return err - } - } - if progressBytes != nil { - if err := infoStorage.WriteLegacyProgress(ctx, progressBytes); err != nil { - return err - } - } - return nil } diff --git a/pkg/upgrade/upgrades/v25_1_add_jobs_tables.go b/pkg/upgrade/upgrades/v25_1_add_jobs_tables.go index 7a893b20b172..3d6d2955d380 100644 --- a/pkg/upgrade/upgrades/v25_1_add_jobs_tables.go +++ b/pkg/upgrade/upgrades/v25_1_add_jobs_tables.go @@ -7,6 +7,7 @@ package upgrades import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -18,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/upgrade" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" ) // addJobsTables adds the job_progress, job_progress_history, job_status and @@ -92,60 +92,70 @@ var jobsBackfillPageSize = 32 func backfillJobsTablesAndColumns( ctx context.Context, cv clusterversion.ClusterVersion, d upgrade.TenantDeps, ) error { + every := log.Every(time.Minute) log.Infof(ctx, "backfilling new jobs tables and columns") jobsBackfilled := 0 for { - var done int - if err := d.DB.DescsTxn(ctx, func(ctx context.Context, tx descs.Txn) (retErr error) { - // Find rows that have not been backfilled, which we will detect via a - // NULL owner. The coalesce() here isn't necessary since the vtable will - // produce empty strings rather than nulls if a job somehow is missing an - // owner, but since the owner not being null is what breaks the loop the - // extra coalesce here just ensures wouldn't loop forever if that were to - // change out from under us. - q, err := tx.QueryIteratorEx(ctx, "jobs-backfill-read", tx.KV(), sessiondata.NodeUserSessionDataOverride, - `SELECT - v.job_id, - v.description, - coalesce(v.user_name, ''), - v.finished, - v.error, - v.running_status, - v.fraction_completed, - v.high_water_timestamp - FROM crdb_internal.jobs v - LEFT JOIN system.jobs j ON j.id = v.job_id - WHERE j.owner IS NULL - LIMIT $1`, jobsBackfillPageSize, - ) - if err != nil { - return err - } - defer func() { - retErr = errors.CombineErrors(retErr, q.Close()) - }() - - done = 0 - for { - ok, err := q.Next(ctx) + candidateRows, err := d.DB.Executor().QueryBufferedEx(ctx, "jobs-backfill-find", nil, sessiondata.NodeUserSessionDataOverride, + `SELECT id FROM system.jobs WHERE owner IS NULL LIMIT $1`, jobsBackfillPageSize) + if err != nil { + return err + } + if len(candidateRows) == 0 { + // All done! + break + } + for _, candidate := range candidateRows { + id := int(tree.MustBeDInt(candidate[0])) + var backfilled bool + if err := d.DB.DescsTxn(ctx, func(ctx context.Context, tx descs.Txn) (retErr error) { + backfilled = false + // re-read the job in the txn, acquiring locks this time. + found, err := tx.QueryRowEx(ctx, "jobs-backfill-lock-job", tx.KV(), sessiondata.NodeUserSessionDataOverride, + `SELECT id FROM system.jobs WHERE id = $1 AND owner IS NULL FOR UPDATE`, id) if err != nil { return err } - if !ok { - break + // The job isn't here anymore and needing backfill; move on. + if found == nil { + return nil } - row := q.Cur() + backfilled = true + + // Lock the job infos to prevent concurrent modifications there as well. + _, err = tx.QueryBufferedEx(ctx, "jobs-backfill-lock-info", tx.KV(), sessiondata.NodeUserSessionDataOverride, + `SELECT job_id FROM system.job_info WHERE job_id = $1 FOR UPDATE`, id) + if err != nil { + return err + } + // Materialize the job details for the (now locked) job row. + row, err := tx.QueryRowEx(ctx, "jobs-backfill-read", tx.KV(), sessiondata.NodeUserSessionDataOverride, + `SELECT + job_id, + description, + user_name, + finished, + error, + running_status, + fraction_completed, + high_water_timestamp + FROM crdb_internal.jobs + WHERE job_id = $1`, id, + ) + if err != nil { + return err + } // Update the job row. if _, err := tx.ExecEx(ctx, "jobs-backfill-jobs", tx.KV(), sessiondata.NodeUserSessionDataOverride, `UPDATE system.jobs - SET description = $1, - owner = $2, - finished = $3, - error_msg = NULLIF($4, '') - WHERE id = $5`, row[1], row[2], row[3], row[4], row[0], + SET description = $1, + owner = $2, + finished = $3, + error_msg = NULLIF($4, '') + WHERE id = $5`, row[1], row[2], row[3], row[4], row[0], ); err != nil { return err } @@ -174,16 +184,16 @@ func backfillJobsTablesAndColumns( return err } } - done++ + return nil + }); err != nil { + return err + } + if backfilled { + jobsBackfilled++ + } + if every.ShouldLog() { + log.Infof(ctx, "backfilled new columns for %d jobs so far", jobsBackfilled) } - return nil - }); err != nil { - return err - } - - jobsBackfilled += done - if done == 0 { - break } } log.Infof(ctx, "finished backfilling new jobs tables and columns for %d jobs", jobsBackfilled)