diff --git a/pkg/sql/drop_database.go b/pkg/sql/drop_database.go index babf3a549494..66148ba0c50c 100644 --- a/pkg/sql/drop_database.go +++ b/pkg/sql/drop_database.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -128,6 +129,12 @@ func (n *dropDatabaseNode) startExec(params runParams) error { ctx := params.ctx p := params.p + // Exit early with an error if the schema is undergoing a declarative schema + // change. + if n.dbDesc.HasConcurrentSchemaChanges() { + return scerrors.ConcurrentSchemaChangeError(n.dbDesc) + } + // Drop all of the collected objects. if err := n.d.dropAllCollectedObjects(ctx, p); err != nil { return err diff --git a/pkg/sql/drop_function.go b/pkg/sql/drop_function.go index 60f0c7e96e49..d5335df63449 100644 --- a/pkg/sql/drop_function.go +++ b/pkg/sql/drop_function.go @@ -173,7 +173,7 @@ func (p *planner) dropFunctionImpl(ctx context.Context, fnMutable *funcdesc.Muta // Exit early with an error if the function is undergoing a declarative schema // change, before we try to get job IDs and update job statuses later. See // createOrUpdateSchemaChangeJob. - if fnMutable.GetDeclarativeSchemaChangerState() != nil { + if fnMutable.HasConcurrentSchemaChanges() { return scerrors.ConcurrentSchemaChangeError(fnMutable) } diff --git a/pkg/sql/drop_schema.go b/pkg/sql/drop_schema.go index 835d164f318a..b8a580335ec9 100644 --- a/pkg/sql/drop_schema.go +++ b/pkg/sql/drop_schema.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" @@ -209,6 +210,12 @@ func (p *planner) dropSchemaImpl( // Update parent database schemas mapping. delete(parentDB.Schemas, sc.GetName()) + // Exit early with an error if the schema is undergoing a declarative schema + // change. + if sc.HasConcurrentSchemaChanges() { + return scerrors.ConcurrentSchemaChangeError(sc) + } + // Update the schema descriptor as dropped. sc.SetDropped() diff --git a/pkg/sql/drop_table.go b/pkg/sql/drop_table.go index 8fc4ab83ad4e..939c8dff2db7 100644 --- a/pkg/sql/drop_table.go +++ b/pkg/sql/drop_table.go @@ -344,7 +344,7 @@ func (p *planner) initiateDropTable( // Exit early with an error if the table is undergoing a declarative schema // change, before we try to get job IDs and update job statuses later. See // createOrUpdateSchemaChangeJob. - if tableDesc.GetDeclarativeSchemaChangerState() != nil { + if tableDesc.HasConcurrentSchemaChanges() { return scerrors.ConcurrentSchemaChangeError(tableDesc) } diff --git a/pkg/sql/drop_type.go b/pkg/sql/drop_type.go index 0e594104f3fe..c7af4afac745 100644 --- a/pkg/sql/drop_type.go +++ b/pkg/sql/drop_type.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" @@ -256,6 +257,12 @@ func (p *planner) dropTypeImpl( return errors.Errorf("type %q is already being dropped", typeDesc.Name) } + // Exit early with an error if the type is undergoing a declarative schema + // change. + if typeDesc.HasConcurrentSchemaChanges() { + return scerrors.ConcurrentSchemaChangeError(typeDesc) + } + // Actually mark the type as dropped. typeDesc.SetDropped() diff --git a/pkg/sql/schemachanger/schemachanger_test.go b/pkg/sql/schemachanger/schemachanger_test.go index 187389810b9f..08a6b58d3e24 100644 --- a/pkg/sql/schemachanger/schemachanger_test.go +++ b/pkg/sql/schemachanger/schemachanger_test.go @@ -16,6 +16,7 @@ import ( "encoding/hex" "fmt" "regexp" + "strings" "sync" "sync/atomic" "testing" @@ -508,12 +509,21 @@ func TestSchemaChangeWaitsForOtherSchemaChanges(t *testing.T) { func TestConcurrentSchemaChangesWait(t *testing.T) { defer leaktest.AfterTest(t)() + const defaultInitialStmt = `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1` + type concurrentWaitTest struct { + // initial statement run under the declarative schema changer, paused on + // the first post commit phase. + initial string + // concurrent statement run under the legacy schema changer + concurrent string + } ctx := context.Background() - runConcurrentSchemaChangeCase := func(t *testing.T, stmt string, implicit bool) { + runConcurrentSchemaChangeCase := func(t *testing.T, stmts concurrentWaitTest, implicit bool) { defer log.Scope(t).Close(t) var doOnce sync.Once - // Closed when we enter the RunBeforeBackfill knob. - beforeBackfillNotification := make(chan struct{}) + // Closed when we enter the BeforeStage knob with a post commit or later + // phase. + beforePostCommitNotification := make(chan struct{}) // Closed when we're ready to continue with the schema change. continueNotification := make(chan struct{}) // Sent on when we're waiting for the initial schema change. @@ -538,17 +548,13 @@ func TestConcurrentSchemaChangesWait(t *testing.T) { assert.LessOrEqual(t, int(m.MutationID()), 2) } s := p.Stages[idx] - if s.Type() != scop.BackfillType { + if s.Phase < scop.PostCommitPhase { return nil } - for _, op := range s.EdgeOps { - if _, ok := op.(*scop.BackfillIndex); ok { - doOnce.Do(func() { - close(beforeBackfillNotification) - <-continueNotification - }) - } - } + doOnce.Do(func() { + close(beforePostCommitNotification) + <-continueNotification + }) return nil }, }, @@ -567,8 +573,10 @@ func TestConcurrentSchemaChangesWait(t *testing.T) { } _, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`) assert.NoError(t, err) - _, err = conn.ExecContext(ctx, `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1`) - assert.NoError(t, err) + for _, s := range strings.Split(stmts.initial, ";") { + _, err = conn.ExecContext(ctx, s) + assert.NoError(t, err) + } return nil } concurrentSchemaChangeImplicit := func() error { @@ -577,10 +585,9 @@ func TestConcurrentSchemaChangesWait(t *testing.T) { return err } defer func() { _ = conn.Close() }() - for _, s := range []string{ + for _, s := range append([]string{ `SET use_declarative_schema_changer = 'off'`, - stmt, - } { + }, strings.Split(stmts.concurrent, ";")...) { if _, err = conn.ExecContext(ctx, s); err != nil { return err } @@ -613,8 +620,10 @@ func TestConcurrentSchemaChangesWait(t *testing.T) { if _, err := tx.Exec("SELECT * FROM db.other_t"); err != nil { return err } - if _, err := tx.Exec(stmt); err != nil { - return err + for _, s := range strings.Split(stmts.concurrent, ";") { + if _, err := tx.ExecContext(ctx, s); err != nil { + return err + } } return tx.Commit() }) @@ -624,10 +633,15 @@ func TestConcurrentSchemaChangesWait(t *testing.T) { tdb.Exec(t, `CREATE DATABASE db`) tdb.Exec(t, `CREATE TABLE db.other_t (a INT PRIMARY KEY)`) tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`) + tdb.Exec(t, `CREATE USER testuser`) + tdb.Exec(t, `CREATE SCHEMA db.sc`) + tdb.Exec(t, `ALTER SCHEMA db.sc OWNER to testuser`) + tdb.Exec(t, `CREATE TABLE db.sc.t (a INT PRIMARY KEY)`) + tdb.Exec(t, `ALTER TABLE db.sc.t OWNER to testuser`) var initialSchemaChangeGroup errgroup.Group var concurrentSchemaChangeGroup errgroup.Group initialSchemaChangeGroup.Go(initialSchemaChange) - <-beforeBackfillNotification + <-beforePostCommitNotification if implicit { concurrentSchemaChangeGroup.Go(concurrentSchemaChangeImplicit) } else { @@ -639,19 +653,20 @@ func TestConcurrentSchemaChangesWait(t *testing.T) { require.NoError(t, concurrentSchemaChangeGroup.Wait()) } - stmts := []string{ - `ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`, - `CREATE INDEX ON db.t(a)`, - `ALTER TABLE db.t RENAME COLUMN a TO c`, - `CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`, - `CREATE VIEW db.v AS SELECT a FROM db.t`, - `ALTER TABLE db.t RENAME TO db.new`, - `TRUNCATE TABLE db.t`, - `DROP TABLE db.t`, + stmts := []concurrentWaitTest{ + {defaultInitialStmt, `ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`}, + {defaultInitialStmt, `CREATE INDEX ON db.t(a)`}, + {defaultInitialStmt, `ALTER TABLE db.t RENAME COLUMN a TO c`}, + {defaultInitialStmt, `CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`}, + {defaultInitialStmt, `CREATE VIEW db.v AS SELECT a FROM db.t`}, + {defaultInitialStmt, `ALTER TABLE db.t RENAME TO db.new`}, + {defaultInitialStmt, `TRUNCATE TABLE db.t`}, + {defaultInitialStmt, `DROP TABLE db.t`}, + {"USE db; DROP OWNED BY testuser;", `DROP DATABASE db`}, } for i := range stmts { stmt := stmts[i] // copy for closure - t.Run(stmt, func(t *testing.T) { + t.Run(stmt.concurrent, func(t *testing.T) { testutils.RunTrueAndFalse(t, "implicit", func(t *testing.T, implicit bool) { runConcurrentSchemaChangeCase(t, stmt, implicit) })