Skip to content

Commit

Permalink
Merge pull request #36734 from dt/backport19.1-36641
Browse files Browse the repository at this point in the history
release-19.1: sql: maintain leases minutely in background loop during dist backfill
  • Loading branch information
dt authored Apr 10, 2019
2 parents 0d20eff + 1de623c commit 7669ac5
Showing 1 changed file with 119 additions and 93 deletions.
212 changes: 119 additions & 93 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,118 +511,144 @@ func (sc *SchemaChanger) distBackfill(
}
chunkSize := sc.getChunkSize(backfillChunkSize)

origNRanges := -1
origFractionCompleted := sc.job.FractionCompleted()
fractionLeft := 1 - origFractionCompleted
readAsOf := sc.clock.Now()
for {
var spans []roachpb.Span
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
spans, _, _, err = distsqlrun.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.tableID, sc.mutationID, filter)
return err
}); err != nil {
return err
}
if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

if len(spans) <= 0 {
break
// start a background goroutine to extend the lease minutely.
extendLeases := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(schemaChangeLeaseDuration.Get(&sc.settings.SV) / time.Duration(4))
defer tick.Stop()
ctxDone := ctx.Done()
for {
select {
case <-extendLeases:
return nil
case <-ctxDone:
return nil
case <-tick.C:
if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}
}
}
})

if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}
log.VEventf(ctx, 2, "backfill: process %+v spans", spans)
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Report schema change progress. We define progress at this point
// as the the fraction of fully-backfilled ranges of the primary index of
// the table being scanned. Since we may have already modified the
// fraction completed of our job from the 10% allocated to completing the
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := sc.nRanges(ctx, txn, spans)
if err != nil {
g.GoCtx(func(ctx context.Context) error {
defer close(extendLeases)
origNRanges := -1
origFractionCompleted := sc.job.FractionCompleted()
fractionLeft := 1 - origFractionCompleted
readAsOf := sc.clock.Now()
for {
var spans []roachpb.Span
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
spans, _, _, err = distsqlrun.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.tableID, sc.mutationID, filter)
return err
}); err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}

if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished
if err := sc.job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted)); err != nil {
return jobs.SimplifyInvalidStatusError(err)
}
if len(spans) <= 0 {
break
}

tc := &TableCollection{leaseMgr: sc.leaseMgr}
// Use a leased table descriptor for the backfill.
defer tc.releaseTables(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
if err != nil {
return err
}
// otherTableDescs contains any other table descriptors required by the
// backfiller processor.
var otherTableDescs []sqlbase.TableDescriptor
if backfillType == columnBackfill {
fkTables, err := row.MakeFkMetadata(
ctx,
tableDesc,
row.CheckUpdates,
row.NoLookup,
row.NoCheckPrivilege,
nil, /* AnalyzeExprFunction */
nil, /* CheckHelper */
)
log.VEventf(ctx, 2, "backfill: process %+v spans", spans)
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Report schema change progress. We define progress at this point
// as the the fraction of fully-backfilled ranges of the primary index of
// the table being scanned. Since we may have already modified the
// fraction completed of our job from the 10% allocated to completing the
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := sc.nRanges(ctx, txn, spans)
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}

if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished
if err := sc.job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted)); err != nil {
return jobs.SimplifyInvalidStatusError(err)
}
}

for k := range fkTables {
table, err := tc.getTableVersionByID(ctx, txn, k, ObjectLookupFlags{})
tc := &TableCollection{leaseMgr: sc.leaseMgr}
// Use a leased table descriptor for the backfill.
defer tc.releaseTables(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
if err != nil {
return err
}
// otherTableDescs contains any other table descriptors required by the
// backfiller processor.
var otherTableDescs []sqlbase.TableDescriptor
if backfillType == columnBackfill {
fkTables, err := row.MakeFkMetadata(
ctx,
tableDesc,
row.CheckUpdates,
row.NoLookup,
row.NoCheckPrivilege,
nil, /* AnalyzeExprFunction */
nil, /* CheckHelper */
)
if err != nil {
return err
}
otherTableDescs = append(otherTableDescs, *table.TableDesc())

for k := range fkTables {
table, err := tc.getTableVersionByID(ctx, txn, k, ObjectLookupFlags{})
if err != nil {
return err
}
otherTableDescs = append(otherTableDescs, *table.TableDesc())
}
}
}
rw := &errOnlyResultWriter{}
recv := MakeDistSQLReceiver(
ctx,
rw,
tree.Rows, /* stmtType - doesn't matter here since no result are produced */
sc.rangeDescriptorCache,
sc.leaseHolderCache,
nil, /* txn - the flow does not run wholly in a txn */
func(ts hlc.Timestamp) {
_ = sc.clock.Update(ts)
},
evalCtx.Tracing,
)
defer recv.Release()
planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, evalCtx, txn)
plan, err := sc.distSQLPlanner.createBackfiller(
planCtx, backfillType, *tableDesc.TableDesc(), duration, chunkSize, spans, otherTableDescs, readAsOf,
)
if err != nil {
rw := &errOnlyResultWriter{}
recv := MakeDistSQLReceiver(
ctx,
rw,
tree.Rows, /* stmtType - doesn't matter here since no result are produced */
sc.rangeDescriptorCache,
sc.leaseHolderCache,
nil, /* txn - the flow does not run wholly in a txn */
func(ts hlc.Timestamp) {
_ = sc.clock.Update(ts)
},
evalCtx.Tracing,
)
defer recv.Release()
planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, evalCtx, txn)
plan, err := sc.distSQLPlanner.createBackfiller(
planCtx, backfillType, *tableDesc.TableDesc(), duration, chunkSize, spans, otherTableDescs, readAsOf,
)
if err != nil {
return err
}
sc.distSQLPlanner.Run(
planCtx,
nil, /* txn - the processors manage their own transactions */
&plan, recv, evalCtx,
nil, /* finishedSetupFn */
)
return rw.Err()
}); err != nil {
return err
}
sc.distSQLPlanner.Run(
planCtx,
nil, /* txn - the processors manage their own transactions */
&plan, recv, evalCtx,
nil, /* finishedSetupFn */
)
return rw.Err()
}); err != nil {
return err
}
}
return nil
return nil
})
return g.Wait()
}

// validate the new indexes being added
Expand Down

0 comments on commit 7669ac5

Please sign in to comment.