Skip to content

Commit

Permalink
Merge pull request #107705 from postamar/backport23.1-107636
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Posta authored Jul 27, 2023
2 parents aaedb3d + e38e750 commit d452037
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 15 deletions.
17 changes: 6 additions & 11 deletions pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func (p *planner) waitForDescriptorSchemaChanges(
ctx context.Context, descID descpb.ID, scs SchemaChangerState,
) error {

if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil &&
knobs.BeforeWaitingForConcurrentSchemaChanges != nil {
knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs
if knobs != nil && knobs.BeforeWaitingForConcurrentSchemaChanges != nil {
knobs.BeforeWaitingForConcurrentSchemaChanges(scs.stmts)
}

Expand All @@ -196,7 +196,6 @@ func (p *planner) waitForDescriptorSchemaChanges(
// Wait for the descriptor to no longer be claimed by a schema change.
start := timeutil.Now()
logEvery := log.Every(10 * time.Second)
var wasBlocked bool
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
now := p.ExecCfg().Clock.Now()
var isBlocked bool
Expand All @@ -215,9 +214,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
}); err != nil {
return err
}
if isBlocked {
wasBlocked = true
} else {
if !isBlocked {
break
}
if logEvery.ShouldLog() {
Expand All @@ -226,11 +223,9 @@ func (p *planner) waitForDescriptorSchemaChanges(
" waited %v so far", descID, timeutil.Since(start),
)
}
}

if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil &&
knobs.AfterWaitingForConcurrentSchemaChanges != nil {
knobs.AfterWaitingForConcurrentSchemaChanges(scs.stmts, wasBlocked)
if knobs != nil && knobs.WhileWaitingForConcurrentSchemaChanges != nil {
knobs.WhileWaitingForConcurrentSchemaChanges(scs.stmts)
}
}

log.Infof(
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scexec/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type TestingKnobs struct {
// for concurrent schema changes to finish.
BeforeWaitingForConcurrentSchemaChanges func(stmts []string)

// AfterWaitingForConcurrentSchemaChanges is called at the end of waiting
// WhileWaitingForConcurrentSchemaChanges is called while waiting
// for concurrent schema changes to finish.
AfterWaitingForConcurrentSchemaChanges func(stmts []string, wasBlocked bool)
WhileWaitingForConcurrentSchemaChanges func(stmts []string)

// OnPostCommitPlanError is called whenever the schema changer job returns an
// error on building the state or on planning the stages.
Expand Down
12 changes: 10 additions & 2 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) {
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
AfterWaitingForConcurrentSchemaChanges: func(stmts []string, wasBlocked bool) {
WhileWaitingForConcurrentSchemaChanges: func(stmts []string) {
for _, stmt := range stmts {
if wasBlocked && strings.Contains(stmt, "ADD COLUMN") {
if strings.Contains(stmt, "ADD COLUMN") {
addColumnBlockedCounter.Add(1)
return
}
Expand Down Expand Up @@ -161,6 +161,14 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) {
wg.Done()
}()

// The ADD COLUMN schema change must block.
testutils.SucceedsSoon(t, func() error {
if addColumnBlockedCounter.Load() == 0 {
return errors.New("waiting for concurrent schema change to block")
}
return nil
})

// Unblock the create index job.
continueNotif <- struct{}{}
wg.Wait()
Expand Down

0 comments on commit d452037

Please sign in to comment.