Skip to content

Commit

Permalink
sql: maintain leases minutely in background loop during dist backfill
Browse files Browse the repository at this point in the history
Extending the lease in the same loop as the distsql flow means it relies on that flow to finish promptly --
if it runs long (waiting on a flush or something), the loop is blocked and may fail loop around and extend
the lease in time.

This moves the lease extending to a separate, background loop, removing the strict time concerrn from the distsql flow.
As structured, job progress information is still only updated one per loop, so the flow should still attempt to be
bounded in execution time to avoid blocking the loop too long, but at least now if it does, it should not risk the
schema change lease being lost.

This opens the possibility of raising the default backfill run duration -- it was conservatively set lower since if
it ran over for any reason -- i.e. blocked on a single slow request -- it could cause the problems described above, so
as a result its default had to be *much* lower than the lease time.

Release note: none.
  • Loading branch information
dt committed Apr 10, 2019
1 parent 76e8e78 commit 1de623c
Showing 1 changed file with 119 additions and 93 deletions.
212 changes: 119 additions & 93 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,118 +511,144 @@ func (sc *SchemaChanger) distBackfill(
}
chunkSize := sc.getChunkSize(backfillChunkSize)

origNRanges := -1
origFractionCompleted := sc.job.FractionCompleted()
fractionLeft := 1 - origFractionCompleted
readAsOf := sc.clock.Now()
for {
var spans []roachpb.Span
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
spans, _, _, err = distsqlrun.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.tableID, sc.mutationID, filter)
return err
}); err != nil {
return err
}
if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

if len(spans) <= 0 {
break
// start a background goroutine to extend the lease minutely.
extendLeases := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(schemaChangeLeaseDuration.Get(&sc.settings.SV) / time.Duration(4))
defer tick.Stop()
ctxDone := ctx.Done()
for {
select {
case <-extendLeases:
return nil
case <-ctxDone:
return nil
case <-tick.C:
if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}
}
}
})

if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}
log.VEventf(ctx, 2, "backfill: process %+v spans", spans)
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Report schema change progress. We define progress at this point
// as the the fraction of fully-backfilled ranges of the primary index of
// the table being scanned. Since we may have already modified the
// fraction completed of our job from the 10% allocated to completing the
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := sc.nRanges(ctx, txn, spans)
if err != nil {
g.GoCtx(func(ctx context.Context) error {
defer close(extendLeases)
origNRanges := -1
origFractionCompleted := sc.job.FractionCompleted()
fractionLeft := 1 - origFractionCompleted
readAsOf := sc.clock.Now()
for {
var spans []roachpb.Span
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
spans, _, _, err = distsqlrun.GetResumeSpans(
ctx, sc.jobRegistry, txn, sc.tableID, sc.mutationID, filter)
return err
}); err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}

if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished
if err := sc.job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted)); err != nil {
return jobs.SimplifyInvalidStatusError(err)
}
if len(spans) <= 0 {
break
}

tc := &TableCollection{leaseMgr: sc.leaseMgr}
// Use a leased table descriptor for the backfill.
defer tc.releaseTables(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
if err != nil {
return err
}
// otherTableDescs contains any other table descriptors required by the
// backfiller processor.
var otherTableDescs []sqlbase.TableDescriptor
if backfillType == columnBackfill {
fkTables, err := row.MakeFkMetadata(
ctx,
tableDesc,
row.CheckUpdates,
row.NoLookup,
row.NoCheckPrivilege,
nil, /* AnalyzeExprFunction */
nil, /* CheckHelper */
)
log.VEventf(ctx, 2, "backfill: process %+v spans", spans)
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Report schema change progress. We define progress at this point
// as the the fraction of fully-backfilled ranges of the primary index of
// the table being scanned. Since we may have already modified the
// fraction completed of our job from the 10% allocated to completing the
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := sc.nRanges(ctx, txn, spans)
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}

if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished
if err := sc.job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted)); err != nil {
return jobs.SimplifyInvalidStatusError(err)
}
}

for k := range fkTables {
table, err := tc.getTableVersionByID(ctx, txn, k, ObjectLookupFlags{})
tc := &TableCollection{leaseMgr: sc.leaseMgr}
// Use a leased table descriptor for the backfill.
defer tc.releaseTables(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
if err != nil {
return err
}
// otherTableDescs contains any other table descriptors required by the
// backfiller processor.
var otherTableDescs []sqlbase.TableDescriptor
if backfillType == columnBackfill {
fkTables, err := row.MakeFkMetadata(
ctx,
tableDesc,
row.CheckUpdates,
row.NoLookup,
row.NoCheckPrivilege,
nil, /* AnalyzeExprFunction */
nil, /* CheckHelper */
)
if err != nil {
return err
}
otherTableDescs = append(otherTableDescs, *table.TableDesc())

for k := range fkTables {
table, err := tc.getTableVersionByID(ctx, txn, k, ObjectLookupFlags{})
if err != nil {
return err
}
otherTableDescs = append(otherTableDescs, *table.TableDesc())
}
}
}
rw := &errOnlyResultWriter{}
recv := MakeDistSQLReceiver(
ctx,
rw,
tree.Rows, /* stmtType - doesn't matter here since no result are produced */
sc.rangeDescriptorCache,
sc.leaseHolderCache,
nil, /* txn - the flow does not run wholly in a txn */
func(ts hlc.Timestamp) {
_ = sc.clock.Update(ts)
},
evalCtx.Tracing,
)
defer recv.Release()
planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, evalCtx, txn)
plan, err := sc.distSQLPlanner.createBackfiller(
planCtx, backfillType, *tableDesc.TableDesc(), duration, chunkSize, spans, otherTableDescs, readAsOf,
)
if err != nil {
rw := &errOnlyResultWriter{}
recv := MakeDistSQLReceiver(
ctx,
rw,
tree.Rows, /* stmtType - doesn't matter here since no result are produced */
sc.rangeDescriptorCache,
sc.leaseHolderCache,
nil, /* txn - the flow does not run wholly in a txn */
func(ts hlc.Timestamp) {
_ = sc.clock.Update(ts)
},
evalCtx.Tracing,
)
defer recv.Release()
planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, evalCtx, txn)
plan, err := sc.distSQLPlanner.createBackfiller(
planCtx, backfillType, *tableDesc.TableDesc(), duration, chunkSize, spans, otherTableDescs, readAsOf,
)
if err != nil {
return err
}
sc.distSQLPlanner.Run(
planCtx,
nil, /* txn - the processors manage their own transactions */
&plan, recv, evalCtx,
nil, /* finishedSetupFn */
)
return rw.Err()
}); err != nil {
return err
}
sc.distSQLPlanner.Run(
planCtx,
nil, /* txn - the processors manage their own transactions */
&plan, recv, evalCtx,
nil, /* finishedSetupFn */
)
return rw.Err()
}); err != nil {
return err
}
}
return nil
return nil
})
return g.Wait()
}

// validate the new indexes being added
Expand Down

0 comments on commit 1de623c

Please sign in to comment.