Skip to content

Commit

Permalink
schemachanger: deflake TestConcurrentDeclarativeSchemaChanges
Browse files Browse the repository at this point in the history
This commit deflakes this test by checking that the second schema change
actually does block because of the first one, rather than checking that
it has blocked. The bug was that the latter wasn't always guaranteed to
happen because we didn't force the schema changes to run in parallel.

Fixes #106732.

Release note: None
  • Loading branch information
Marius Posta committed Jul 26, 2023
1 parent c10ca05 commit abc0966
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
17 changes: 6 additions & 11 deletions pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (p *planner) waitForDescriptorSchemaChanges(
ctx context.Context, descID descpb.ID, scs SchemaChangerState,
) error {

if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil &&
knobs.BeforeWaitingForConcurrentSchemaChanges != nil {
knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs
if knobs != nil && knobs.BeforeWaitingForConcurrentSchemaChanges != nil {
knobs.BeforeWaitingForConcurrentSchemaChanges(scs.stmts)
}

Expand All @@ -199,7 +199,6 @@ func (p *planner) waitForDescriptorSchemaChanges(
// Wait for the descriptor to no longer be claimed by a schema change.
start := timeutil.Now()
logEvery := log.Every(10 * time.Second)
var wasBlocked bool
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
now := p.ExecCfg().Clock.Now()
var isBlocked bool
Expand All @@ -218,9 +217,7 @@ func (p *planner) waitForDescriptorSchemaChanges(
}); err != nil {
return err
}
if isBlocked {
wasBlocked = true
} else {
if !isBlocked {
break
}
if logEvery.ShouldLog() {
Expand All @@ -229,11 +226,9 @@ func (p *planner) waitForDescriptorSchemaChanges(
" waited %v so far", descID, timeutil.Since(start),
)
}
}

if knobs := p.ExecCfg().DeclarativeSchemaChangerTestingKnobs; knobs != nil &&
knobs.AfterWaitingForConcurrentSchemaChanges != nil {
knobs.AfterWaitingForConcurrentSchemaChanges(scs.stmts, wasBlocked)
if knobs != nil && knobs.WhileWaitingForConcurrentSchemaChanges != nil {
knobs.WhileWaitingForConcurrentSchemaChanges(scs.stmts)
}
}

log.Infof(
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/schemachanger/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ go_test(
"//pkg/sql/sqltestutils",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/util",
"//pkg/util/ctxgroup",
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scexec/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type TestingKnobs struct {
// for concurrent schema changes to finish.
BeforeWaitingForConcurrentSchemaChanges func(stmts []string)

// AfterWaitingForConcurrentSchemaChanges is called at the end of waiting
// WhileWaitingForConcurrentSchemaChanges is called while waiting
// for concurrent schema changes to finish.
AfterWaitingForConcurrentSchemaChanges func(stmts []string, wasBlocked bool)
WhileWaitingForConcurrentSchemaChanges func(stmts []string)

// OnPostCommitPlanError is called whenever the schema changer job returns an
// error on building the state or on planning the stages.
Expand Down
14 changes: 10 additions & 4 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand All @@ -58,7 +57,6 @@ import (
// schema changes operating on the same descriptors are performed serially.
func TestConcurrentDeclarativeSchemaChanges(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 106732, "flaky test")
defer log.Scope(t).Close(t)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -99,9 +97,9 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) {
var params base.TestServerArgs
params.Knobs = base.TestingKnobs{
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
AfterWaitingForConcurrentSchemaChanges: func(stmts []string, wasBlocked bool) {
WhileWaitingForConcurrentSchemaChanges: func(stmts []string) {
for _, stmt := range stmts {
if wasBlocked && strings.Contains(stmt, "ALTER PRIMARY KEY") {
if strings.Contains(stmt, "ALTER PRIMARY KEY") {
alterPrimaryKeyBlockedCounter.Add(1)
return
}
Expand Down Expand Up @@ -161,6 +159,14 @@ func TestConcurrentDeclarativeSchemaChanges(t *testing.T) {
wg.Done()
}()

// The ALTER PRIMARY KEY schema change must block.
testutils.SucceedsSoon(t, func() error {
if alterPrimaryKeyBlockedCounter.Load() == 0 {
return errors.New("waiting for concurrent schema change to block")
}
return nil
})

// Unblock the create index job.
continueNotif <- struct{}{}
wg.Wait()
Expand Down

0 comments on commit abc0966

Please sign in to comment.