diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 3deb7499d707..0c764e7ee7ab 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -1198,9 +1198,39 @@ func (sc *SchemaChanger) distColumnBackfill( origNRanges := -1 origFractionCompleted := sc.job.FractionCompleted() fractionLeft := 1 - origFractionCompleted - readAsOf := sc.clock.Now() // Gather the initial resume spans for the table. var todoSpans []roachpb.Span + + maybeUpdateFractionProgressed := func() error { + // Report schema change progress. We define progress at this point + // as the fraction of fully-backfilled ranges of the primary index of + // the table being scanned. Since we may have already modified the + // fraction completed of our job from the 10% allocated to completing the + // schema change state machine or from a previous backfill attempt, + // we scale that fraction of ranges completed by the remaining fraction + // of the job's progress bar. + nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans) + if err != nil { + return err + } + if origNRanges == -1 { + origNRanges = nRanges + } + if nRanges >= origNRanges { + return nil + } + fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges) + fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished + // Note that this explicitly uses a nil txn, which will lead to a new + // transaction being created as a part of this update. We want this + // update operation to be short and to not be coupled to any other + // backfill work, which may take much longer. + return sc.job.FractionProgressed( + ctx, nil /* txn */, jobs.FractionUpdater(fractionCompleted), + ) + } + + readAsOf := sc.clock.Now() var mutationIdx int if err := DescsTxn(ctx, sc.execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) { todoSpans, _, mutationIdx, err = rowexec.GetResumeSpans( @@ -1212,6 +1242,9 @@ func (sc *SchemaChanger) distColumnBackfill( for len(todoSpans) > 0 { log.VEventf(ctx, 2, "backfill: process %+v spans", todoSpans) + if err := maybeUpdateFractionProgressed(); err != nil { + return err + } // Make sure not to update todoSpans inside the transaction closure as it // may not commit. Instead write the updated value for todoSpans to this // variable and assign to todoSpans after committing. @@ -1220,29 +1253,6 @@ func (sc *SchemaChanger) distColumnBackfill( ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ) error { updatedTodoSpans = todoSpans - // Report schema change progress. We define progress at this point - // as the fraction of fully-backfilled ranges of the primary index of - // the table being scanned. Since we may have already modified the - // fraction completed of our job from the 10% allocated to completing the - // schema change state machine or from a previous backfill attempt, - // we scale that fraction of ranges completed by the remaining fraction - // of the job's progress bar. - nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans) - if err != nil { - return err - } - if origNRanges == -1 { - origNRanges = nRanges - } - - if nRanges < origNRanges { - fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges) - fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished - if err := sc.job.FractionProgressed(ctx, txn, jobs.FractionUpdater(fractionCompleted)); err != nil { - return jobs.SimplifyInvalidStatusError(err) - } - } - tableDesc, err := sc.getTableVersion(ctx, txn, descriptors, version) if err != nil { return err diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 15846324ad0c..f0f8df8e61ae 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -8285,3 +8285,65 @@ func TestVirtualColumnNotAllowedInPkeyBefore22_1(t *testing.T) { require.Error(t, err) require.Equal(t, "pq: cannot use virtual column \"b\" in primary key", err.Error()) } + +// TestColumnBackfillProcessingDoesNotHoldLockOnJobsTable is a +// regression test to ensure that when the column backfill progresses +// to the next backfill chunk and it needs to update its progress, it +// does not hold a lock on the jobs table for the duration of processing +// the next chunk. +func TestColumnBackfillProcessingDoesNotHoldLockOnJobsTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + params, _ := tests.CreateTestServerParams() + chCh := make(chan chan error) + params.Knobs.DistSQL = &execinfra.TestingKnobs{ + RunBeforeBackfillChunk: func(sp roachpb.Span) error { + ch := make(chan error) + chCh <- ch + return <-ch + }, + } + params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ + BackfillChunkSize: 1, + WriteCheckpointInterval: time.Nanosecond, + } + + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + tdb.Exec(t, "INSERT INTO foo SELECT * FROM generate_series(1, 10)") + tdb.Exec(t, "ALTER TABLE foo SPLIT AT SELECT * FROM generate_series(1, 9)") + errCh := make(chan error) + go func() { + _, err := sqlDB.Exec(` +SET use_declarative_schema_changer = 'off'; +ALTER TABLE foo ADD COLUMN j INT DEFAULT 42; +`) + errCh <- err + }() + // Wait for one iteration. + close(<-chCh) + // Wait for another iteration. + ch := <-chCh + // Ensure that the progress has been set to something non-zero, and + // that a lock has not been held. + tdb.CheckQueryResults(t, ` +SELECT fraction_completed > 0 + FROM crdb_internal.jobs + WHERE description LIKE '%ADD COLUMN j INT8 DEFAULT 42'`, + [][]string{{"true"}}) + close(ch) + for { + select { + case ch := <-chCh: + close(ch) + case err := <-errCh: + require.NoError(t, err) + return + } + } +}