Skip to content

Commit

Permalink
sql/schemachanger: block concurrent use of declarative and legacy
Browse files Browse the repository at this point in the history
Fixes: cockroachdb#75604, cockroachdb#88922

Previously, when dropping databases, schemas or
types it was possible that the declarative and
legacy schema changers could potentially conflict.
To avoid issues we will detect inside the legacy
schema changer if the declarative schema changer
is in use when dropping tables.

Release note: None
  • Loading branch information
fqazi committed Sep 29, 2022
1 parent 13600be commit f34510f
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 32 deletions.
7 changes: 7 additions & 0 deletions pkg/sql/drop_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -128,6 +129,12 @@ func (n *dropDatabaseNode) startExec(params runParams) error {
ctx := params.ctx
p := params.p

// Exit early with an error if the schema is undergoing a declarative schema
// change.
if n.dbDesc.HasConcurrentSchemaChanges() {
return scerrors.ConcurrentSchemaChangeError(n.dbDesc)
}

// Drop all of the collected objects.
if err := n.d.dropAllCollectedObjects(ctx, p); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (p *planner) dropFunctionImpl(ctx context.Context, fnMutable *funcdesc.Muta
// Exit early with an error if the function is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if fnMutable.GetDeclarativeSchemaChangerState() != nil {
if fnMutable.HasConcurrentSchemaChanges() {
return scerrors.ConcurrentSchemaChangeError(fnMutable)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/drop_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
Expand Down Expand Up @@ -209,6 +210,12 @@ func (p *planner) dropSchemaImpl(
// Update parent database schemas mapping.
delete(parentDB.Schemas, sc.GetName())

// Exit early with an error if the schema is undergoing a declarative schema
// change.
if sc.HasConcurrentSchemaChanges() {
return scerrors.ConcurrentSchemaChangeError(sc)
}

// Update the schema descriptor as dropped.
sc.SetDropped()

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (p *planner) initiateDropTable(
// Exit early with an error if the table is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if tableDesc.GetDeclarativeSchemaChangerState() != nil {
if tableDesc.HasConcurrentSchemaChanges() {
return scerrors.ConcurrentSchemaChangeError(tableDesc)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/drop_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
Expand Down Expand Up @@ -256,6 +257,12 @@ func (p *planner) dropTypeImpl(
return errors.Errorf("type %q is already being dropped", typeDesc.Name)
}

// Exit early with an error if the type is undergoing a declarative schema
// change.
if typeDesc.HasConcurrentSchemaChanges() {
return scerrors.ConcurrentSchemaChangeError(typeDesc)
}

// Actually mark the type as dropped.
typeDesc.SetDropped()

Expand Down
75 changes: 45 additions & 30 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"encoding/hex"
"fmt"
"regexp"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -508,12 +509,21 @@ func TestSchemaChangeWaitsForOtherSchemaChanges(t *testing.T) {
func TestConcurrentSchemaChangesWait(t *testing.T) {
defer leaktest.AfterTest(t)()

const defaultInitialStmt = `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1`
type concurrentWaitTest struct {
// initial statement run under the declarative schema changer, paused on
// the first post commit phase.
initial string
// concurrent statement run under the legacy schema changer
concurrent string
}
ctx := context.Background()
runConcurrentSchemaChangeCase := func(t *testing.T, stmt string, implicit bool) {
runConcurrentSchemaChangeCase := func(t *testing.T, stmts concurrentWaitTest, implicit bool) {
defer log.Scope(t).Close(t)
var doOnce sync.Once
// Closed when we enter the RunBeforeBackfill knob.
beforeBackfillNotification := make(chan struct{})
// Closed when we enter the BeforeStage knob with a post commit or later
// phase.
beforePostCommitNotification := make(chan struct{})
// Closed when we're ready to continue with the schema change.
continueNotification := make(chan struct{})
// Sent on when we're waiting for the initial schema change.
Expand All @@ -538,17 +548,13 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
assert.LessOrEqual(t, int(m.MutationID()), 2)
}
s := p.Stages[idx]
if s.Type() != scop.BackfillType {
if s.Phase < scop.PostCommitPhase {
return nil
}
for _, op := range s.EdgeOps {
if _, ok := op.(*scop.BackfillIndex); ok {
doOnce.Do(func() {
close(beforeBackfillNotification)
<-continueNotification
})
}
}
doOnce.Do(func() {
close(beforePostCommitNotification)
<-continueNotification
})
return nil
},
},
Expand All @@ -567,8 +573,10 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
}
_, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`)
assert.NoError(t, err)
_, err = conn.ExecContext(ctx, `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1`)
assert.NoError(t, err)
for _, s := range strings.Split(stmts.initial, ";") {
_, err = conn.ExecContext(ctx, s)
assert.NoError(t, err)
}
return nil
}
concurrentSchemaChangeImplicit := func() error {
Expand All @@ -577,10 +585,9 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
return err
}
defer func() { _ = conn.Close() }()
for _, s := range []string{
for _, s := range append([]string{
`SET use_declarative_schema_changer = 'off'`,
stmt,
} {
}, strings.Split(stmts.concurrent, ";")...) {
if _, err = conn.ExecContext(ctx, s); err != nil {
return err
}
Expand Down Expand Up @@ -613,8 +620,10 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
if _, err := tx.Exec("SELECT * FROM db.other_t"); err != nil {
return err
}
if _, err := tx.Exec(stmt); err != nil {
return err
for _, s := range strings.Split(stmts.concurrent, ";") {
if _, err := tx.ExecContext(ctx, s); err != nil {
return err
}
}
return tx.Commit()
})
Expand All @@ -624,10 +633,15 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
tdb.Exec(t, `CREATE DATABASE db`)
tdb.Exec(t, `CREATE TABLE db.other_t (a INT PRIMARY KEY)`)
tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`)
tdb.Exec(t, `CREATE USER testuser`)
tdb.Exec(t, `CREATE SCHEMA db.sc`)
tdb.Exec(t, `ALTER SCHEMA db.sc OWNER to testuser`)
tdb.Exec(t, `CREATE TABLE db.sc.t (a INT PRIMARY KEY)`)
tdb.Exec(t, `ALTER TABLE db.sc.t OWNER to testuser`)
var initialSchemaChangeGroup errgroup.Group
var concurrentSchemaChangeGroup errgroup.Group
initialSchemaChangeGroup.Go(initialSchemaChange)
<-beforeBackfillNotification
<-beforePostCommitNotification
if implicit {
concurrentSchemaChangeGroup.Go(concurrentSchemaChangeImplicit)
} else {
Expand All @@ -639,19 +653,20 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
require.NoError(t, concurrentSchemaChangeGroup.Wait())
}

stmts := []string{
`ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`,
`CREATE INDEX ON db.t(a)`,
`ALTER TABLE db.t RENAME COLUMN a TO c`,
`CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`,
`CREATE VIEW db.v AS SELECT a FROM db.t`,
`ALTER TABLE db.t RENAME TO db.new`,
`TRUNCATE TABLE db.t`,
`DROP TABLE db.t`,
stmts := []concurrentWaitTest{
{defaultInitialStmt, `ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`},
{defaultInitialStmt, `CREATE INDEX ON db.t(a)`},
{defaultInitialStmt, `ALTER TABLE db.t RENAME COLUMN a TO c`},
{defaultInitialStmt, `CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`},
{defaultInitialStmt, `CREATE VIEW db.v AS SELECT a FROM db.t`},
{defaultInitialStmt, `ALTER TABLE db.t RENAME TO db.new`},
{defaultInitialStmt, `TRUNCATE TABLE db.t`},
{defaultInitialStmt, `DROP TABLE db.t`},
{"USE db; DROP OWNED BY testuser;", `DROP DATABASE db`},
}
for i := range stmts {
stmt := stmts[i] // copy for closure
t.Run(stmt, func(t *testing.T) {
t.Run(stmt.concurrent, func(t *testing.T) {
testutils.RunTrueAndFalse(t, "implicit", func(t *testing.T, implicit bool) {
runConcurrentSchemaChangeCase(t, stmt, implicit)
})
Expand Down

0 comments on commit f34510f

Please sign in to comment.