Skip to content

Commit

Permalink
Merge pull request #83306 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-83257

release-22.1: sql: update progress in column backfiller non-transactionally
  • Loading branch information
ajwerner authored Jun 28, 2022
2 parents e373bc7 + 03782ad commit cc498d1
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 24 deletions.
58 changes: 34 additions & 24 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,9 +1198,39 @@ func (sc *SchemaChanger) distColumnBackfill(
origNRanges := -1
origFractionCompleted := sc.job.FractionCompleted()
fractionLeft := 1 - origFractionCompleted
readAsOf := sc.clock.Now()
// Gather the initial resume spans for the table.
var todoSpans []roachpb.Span

maybeUpdateFractionProgressed := func() error {
// Report schema change progress. We define progress at this point
// as the fraction of fully-backfilled ranges of the primary index of
// the table being scanned. Since we may have already modified the
// fraction completed of our job from the 10% allocated to completing the
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans)
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}
if nRanges >= origNRanges {
return nil
}
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished
// Note that this explicitly uses a nil txn, which will lead to a new
// transaction being created as a part of this update. We want this
// update operation to be short and to not be coupled to any other
// backfill work, which may take much longer.
return sc.job.FractionProgressed(
ctx, nil /* txn */, jobs.FractionUpdater(fractionCompleted),
)
}

readAsOf := sc.clock.Now()
var mutationIdx int
if err := DescsTxn(ctx, sc.execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) {
todoSpans, _, mutationIdx, err = rowexec.GetResumeSpans(
Expand All @@ -1212,6 +1242,9 @@ func (sc *SchemaChanger) distColumnBackfill(

for len(todoSpans) > 0 {
log.VEventf(ctx, 2, "backfill: process %+v spans", todoSpans)
if err := maybeUpdateFractionProgressed(); err != nil {
return err
}
// Make sure not to update todoSpans inside the transaction closure as it
// may not commit. Instead write the updated value for todoSpans to this
// variable and assign to todoSpans after committing.
Expand All @@ -1220,29 +1253,6 @@ func (sc *SchemaChanger) distColumnBackfill(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
updatedTodoSpans = todoSpans
// Report schema change progress. We define progress at this point
// as the fraction of fully-backfilled ranges of the primary index of
// the table being scanned. Since we may have already modified the
// fraction completed of our job from the 10% allocated to completing the
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans)
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}

if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished
if err := sc.job.FractionProgressed(ctx, txn, jobs.FractionUpdater(fractionCompleted)); err != nil {
return jobs.SimplifyInvalidStatusError(err)
}
}

tableDesc, err := sc.getTableVersion(ctx, txn, descriptors, version)
if err != nil {
return err
Expand Down
62 changes: 62 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8285,3 +8285,65 @@ func TestVirtualColumnNotAllowedInPkeyBefore22_1(t *testing.T) {
require.Error(t, err)
require.Equal(t, "pq: cannot use virtual column \"b\" in primary key", err.Error())
}

// TestColumnBackfillProcessingDoesNotHoldLockOnJobsTable is a
// regression test to ensure that when the column backfill progresses
// to the next backfill chunk and it needs to update its progress, it
// does not hold a lock on the jobs table for the duration of processing
// the next chunk.
func TestColumnBackfillProcessingDoesNotHoldLockOnJobsTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

params, _ := tests.CreateTestServerParams()
chCh := make(chan chan error)
params.Knobs.DistSQL = &execinfra.TestingKnobs{
RunBeforeBackfillChunk: func(sp roachpb.Span) error {
ch := make(chan error)
chCh <- ch
return <-ch
},
}
params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{
BackfillChunkSize: 1,
WriteCheckpointInterval: time.Nanosecond,
}

s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)")
tdb.Exec(t, "INSERT INTO foo SELECT * FROM generate_series(1, 10)")
tdb.Exec(t, "ALTER TABLE foo SPLIT AT SELECT * FROM generate_series(1, 9)")
errCh := make(chan error)
go func() {
_, err := sqlDB.Exec(`
SET use_declarative_schema_changer = 'off';
ALTER TABLE foo ADD COLUMN j INT DEFAULT 42;
`)
errCh <- err
}()
// Wait for one iteration.
close(<-chCh)
// Wait for another iteration.
ch := <-chCh
// Ensure that the progress has been set to something non-zero, and
// that a lock has not been held.
tdb.CheckQueryResults(t, `
SELECT fraction_completed > 0
FROM crdb_internal.jobs
WHERE description LIKE '%ADD COLUMN j INT8 DEFAULT 42'`,
[][]string{{"true"}})
close(ch)
for {
select {
case ch := <-chCh:
close(ch)
case err := <-errCh:
require.NoError(t, err)
return
}
}
}

0 comments on commit cc498d1

Please sign in to comment.