diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index c69262db25d4..9aafef239ec2 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -511,118 +511,144 @@ func (sc *SchemaChanger) distBackfill( } chunkSize := sc.getChunkSize(backfillChunkSize) - origNRanges := -1 - origFractionCompleted := sc.job.FractionCompleted() - fractionLeft := 1 - origFractionCompleted - readAsOf := sc.clock.Now() - for { - var spans []roachpb.Span - if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - var err error - spans, _, _, err = distsqlrun.GetResumeSpans( - ctx, sc.jobRegistry, txn, sc.tableID, sc.mutationID, filter) - return err - }); err != nil { - return err - } + if err := sc.ExtendLease(ctx, lease); err != nil { + return err + } - if len(spans) <= 0 { - break + // start a background goroutine to extend the lease minutely. + extendLeases := make(chan struct{}) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + tick := time.NewTicker(schemaChangeLeaseDuration.Get(&sc.settings.SV) / time.Duration(4)) + defer tick.Stop() + ctxDone := ctx.Done() + for { + select { + case <-extendLeases: + return nil + case <-ctxDone: + return nil + case <-tick.C: + if err := sc.ExtendLease(ctx, lease); err != nil { + return err + } + } } + }) - if err := sc.ExtendLease(ctx, lease); err != nil { - return err - } - log.VEventf(ctx, 2, "backfill: process %+v spans", spans) - if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - // Report schema change progress. We define progress at this point - // as the the fraction of fully-backfilled ranges of the primary index of - // the table being scanned. Since we may have already modified the - // fraction completed of our job from the 10% allocated to completing the - // schema change state machine or from a previous backfill attempt, - // we scale that fraction of ranges completed by the remaining fraction - // of the job's progress bar. - nRanges, err := sc.nRanges(ctx, txn, spans) - if err != nil { + g.GoCtx(func(ctx context.Context) error { + defer close(extendLeases) + origNRanges := -1 + origFractionCompleted := sc.job.FractionCompleted() + fractionLeft := 1 - origFractionCompleted + readAsOf := sc.clock.Now() + for { + var spans []roachpb.Span + if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + var err error + spans, _, _, err = distsqlrun.GetResumeSpans( + ctx, sc.jobRegistry, txn, sc.tableID, sc.mutationID, filter) + return err + }); err != nil { return err - } - if origNRanges == -1 { - origNRanges = nRanges } - if nRanges < origNRanges { - fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges) - fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished - if err := sc.job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted)); err != nil { - return jobs.SimplifyInvalidStatusError(err) - } + if len(spans) <= 0 { + break } - tc := &TableCollection{leaseMgr: sc.leaseMgr} - // Use a leased table descriptor for the backfill. - defer tc.releaseTables(ctx) - tableDesc, err := sc.getTableVersion(ctx, txn, tc, version) - if err != nil { - return err - } - // otherTableDescs contains any other table descriptors required by the - // backfiller processor. - var otherTableDescs []sqlbase.TableDescriptor - if backfillType == columnBackfill { - fkTables, err := row.MakeFkMetadata( - ctx, - tableDesc, - row.CheckUpdates, - row.NoLookup, - row.NoCheckPrivilege, - nil, /* AnalyzeExprFunction */ - nil, /* CheckHelper */ - ) + log.VEventf(ctx, 2, "backfill: process %+v spans", spans) + if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + // Report schema change progress. We define progress at this point + // as the the fraction of fully-backfilled ranges of the primary index of + // the table being scanned. Since we may have already modified the + // fraction completed of our job from the 10% allocated to completing the + // schema change state machine or from a previous backfill attempt, + // we scale that fraction of ranges completed by the remaining fraction + // of the job's progress bar. + nRanges, err := sc.nRanges(ctx, txn, spans) if err != nil { return err } + if origNRanges == -1 { + origNRanges = nRanges + } + + if nRanges < origNRanges { + fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges) + fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished + if err := sc.job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted)); err != nil { + return jobs.SimplifyInvalidStatusError(err) + } + } - for k := range fkTables { - table, err := tc.getTableVersionByID(ctx, txn, k, ObjectLookupFlags{}) + tc := &TableCollection{leaseMgr: sc.leaseMgr} + // Use a leased table descriptor for the backfill. + defer tc.releaseTables(ctx) + tableDesc, err := sc.getTableVersion(ctx, txn, tc, version) + if err != nil { + return err + } + // otherTableDescs contains any other table descriptors required by the + // backfiller processor. + var otherTableDescs []sqlbase.TableDescriptor + if backfillType == columnBackfill { + fkTables, err := row.MakeFkMetadata( + ctx, + tableDesc, + row.CheckUpdates, + row.NoLookup, + row.NoCheckPrivilege, + nil, /* AnalyzeExprFunction */ + nil, /* CheckHelper */ + ) if err != nil { return err } - otherTableDescs = append(otherTableDescs, *table.TableDesc()) + + for k := range fkTables { + table, err := tc.getTableVersionByID(ctx, txn, k, ObjectLookupFlags{}) + if err != nil { + return err + } + otherTableDescs = append(otherTableDescs, *table.TableDesc()) + } } - } - rw := &errOnlyResultWriter{} - recv := MakeDistSQLReceiver( - ctx, - rw, - tree.Rows, /* stmtType - doesn't matter here since no result are produced */ - sc.rangeDescriptorCache, - sc.leaseHolderCache, - nil, /* txn - the flow does not run wholly in a txn */ - func(ts hlc.Timestamp) { - _ = sc.clock.Update(ts) - }, - evalCtx.Tracing, - ) - defer recv.Release() - planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, evalCtx, txn) - plan, err := sc.distSQLPlanner.createBackfiller( - planCtx, backfillType, *tableDesc.TableDesc(), duration, chunkSize, spans, otherTableDescs, readAsOf, - ) - if err != nil { + rw := &errOnlyResultWriter{} + recv := MakeDistSQLReceiver( + ctx, + rw, + tree.Rows, /* stmtType - doesn't matter here since no result are produced */ + sc.rangeDescriptorCache, + sc.leaseHolderCache, + nil, /* txn - the flow does not run wholly in a txn */ + func(ts hlc.Timestamp) { + _ = sc.clock.Update(ts) + }, + evalCtx.Tracing, + ) + defer recv.Release() + planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, evalCtx, txn) + plan, err := sc.distSQLPlanner.createBackfiller( + planCtx, backfillType, *tableDesc.TableDesc(), duration, chunkSize, spans, otherTableDescs, readAsOf, + ) + if err != nil { + return err + } + sc.distSQLPlanner.Run( + planCtx, + nil, /* txn - the processors manage their own transactions */ + &plan, recv, evalCtx, + nil, /* finishedSetupFn */ + ) + return rw.Err() + }); err != nil { return err } - sc.distSQLPlanner.Run( - planCtx, - nil, /* txn - the processors manage their own transactions */ - &plan, recv, evalCtx, - nil, /* finishedSetupFn */ - ) - return rw.Err() - }); err != nil { - return err } - } - return nil + return nil + }) + return g.Wait() } // validate the new indexes being added