From e38e7504787360c969f2e94cd7f845d0e9617390 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Wed, 26 Jul 2023 11:54:18 -0400 Subject: [PATCH] schemachanger: deflake TestConcurrentDeclarativeSchemaChanges This commit deflakes this test by checking that the second schema change actually does block because of the first one, rather than checking that it has blocked. The bug was that the latter wasn't always guaranteed to happen because we didn't force the schema changes to run in parallel. Fixes #106732. Release note: None --- pkg/sql/schema_change_plan_node.go | 17 ++++++----------- pkg/sql/schemachanger/scexec/testing_knobs.go | 4 ++-- pkg/sql/schemachanger/schemachanger_test.go | 12 ++++++++++-- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go index dfd4750cb3ac..f232d9b66e7c 100644 --- a/pkg/sql/schema_change_plan_node.go +++ b/pkg/sql/schema_change_plan_node.go @@ -181,8 +181,8 @@ func (p *planner) waitForDescriptorSchemaChanges( ctx context.Context, descID descpb.ID, scs SchemaChangerState, ) error { - if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil && - knobs.BeforeWaitingForConcurrentSchemaChanges != nil { + knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs + if knobs != nil && knobs.BeforeWaitingForConcurrentSchemaChanges != nil { knobs.BeforeWaitingForConcurrentSchemaChanges(scs.stmts) } @@ -196,7 +196,6 @@ func (p *planner) waitForDescriptorSchemaChanges( // Wait for the descriptor to no longer be claimed by a schema change. start := timeutil.Now() logEvery := log.Every(10 * time.Second) - var wasBlocked bool for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { now := p.ExecCfg().Clock.Now() var isBlocked bool @@ -215,9 +214,7 @@ func (p *planner) waitForDescriptorSchemaChanges( }); err != nil { return err } - if isBlocked { - wasBlocked = true - } else { + if !isBlocked { break } if logEvery.ShouldLog() { @@ -226,11 +223,9 @@ func (p *planner) waitForDescriptorSchemaChanges( " waited %v so far", descID, timeutil.Since(start), ) } - } - - if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil && - knobs.AfterWaitingForConcurrentSchemaChanges != nil { - knobs.AfterWaitingForConcurrentSchemaChanges(scs.stmts, wasBlocked) + if knobs != nil && knobs.WhileWaitingForConcurrentSchemaChanges != nil { + knobs.WhileWaitingForConcurrentSchemaChanges(scs.stmts) + } } log.Infof( diff --git a/pkg/sql/schemachanger/scexec/testing_knobs.go b/pkg/sql/schemachanger/scexec/testing_knobs.go index 684318390ece..bd3d7fd65f20 100644 --- a/pkg/sql/schemachanger/scexec/testing_knobs.go +++ b/pkg/sql/schemachanger/scexec/testing_knobs.go @@ -27,9 +27,9 @@ type TestingKnobs struct { // for concurrent schema changes to finish. BeforeWaitingForConcurrentSchemaChanges func(stmts []string) - // AfterWaitingForConcurrentSchemaChanges is called at the end of waiting + // WhileWaitingForConcurrentSchemaChanges is called while waiting // for concurrent schema changes to finish. - AfterWaitingForConcurrentSchemaChanges func(stmts []string, wasBlocked bool) + WhileWaitingForConcurrentSchemaChanges func(stmts []string) // OnPostCommitPlanError is called whenever the schema changer job returns an // error on building the state or on planning the stages. diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index 0c5736519533..3cec12adce61 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -100,9 +100,9 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) { params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{ - AfterWaitingForConcurrentSchemaChanges: func(stmts []string, wasBlocked bool) { + WhileWaitingForConcurrentSchemaChanges: func(stmts []string) { for _, stmt := range stmts { - if wasBlocked && strings.Contains(stmt, "ADD COLUMN") { + if strings.Contains(stmt, "ADD COLUMN") { addColumnBlockedCounter.Add(1) return } @@ -161,6 +161,14 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) { wg.Done() }() + // The ADD COLUMN schema change must block. + testutils.SucceedsSoon(t, func() error { + if addColumnBlockedCounter.Load() == 0 { + return errors.New("waiting for concurrent schema change to block") + } + return nil + }) + // Unblock the create index job. continueNotif <- struct{}{} wg.Wait()