Skip to content

Commit

Permalink
sql/schemachanger: block concurrent use of declarative and legacy
Browse files Browse the repository at this point in the history
Fixes: #75604, #88922

Previously, when dropping databases, schemas or
types it was possible that the declarative and
legacy schema changers could potentially conflict.
To avoid issues we will detect inside the legacy
schema changer if the declarative schema changer
is in use when dropping tables.

Release note: None
  • Loading branch information
fqazi committed Oct 5, 2022
1 parent dfd0c6e commit 4a5a489
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 34 deletions.
13 changes: 13 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,3 +987,16 @@ func IsSystemDescriptor(desc Descriptor) bool {
}
return false
}

// HasConcurrentDeclarativeSchemaChange returns true iff the descriptors has
// a concurrent declarative schema change. This declarative schema changer is
// extremely disciplined and only writes state information during the pre-commit
// phase, so if a descriptor has a declarative state, we know an ongoing
// declarative schema change active. The legacy schema changer will tag descriptors
// with job IDs even during the statement phase, so we cannot rely on similar
// checks to block concurrent schema changes. Hence, Descriptor.HasConcurrentSchemaChanges
// is not equivalent to this operation (the former can lead to false positives
// for the legacy schema changer).
func HasConcurrentDeclarativeSchemaChange(desc Descriptor) bool {
return desc.GetDeclarativeSchemaChangerState() != nil
}
7 changes: 7 additions & 0 deletions pkg/sql/drop_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -128,6 +129,12 @@ func (n *dropDatabaseNode) startExec(params runParams) error {
ctx := params.ctx
p := params.p

// Exit early with an error if the schema is undergoing a declarative schema
// change.
if catalog.HasConcurrentDeclarativeSchemaChange(n.dbDesc) {
return scerrors.ConcurrentSchemaChangeError(n.dbDesc)
}

// Drop all of the collected objects.
if err := n.d.dropAllCollectedObjects(ctx, p); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (p *planner) dropFunctionImpl(ctx context.Context, fnMutable *funcdesc.Muta
// Exit early with an error if the function is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if fnMutable.GetDeclarativeSchemaChangerState() != nil {
if catalog.HasConcurrentDeclarativeSchemaChange(fnMutable) {
return scerrors.ConcurrentSchemaChangeError(fnMutable)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/drop_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
Expand Down Expand Up @@ -209,6 +210,12 @@ func (p *planner) dropSchemaImpl(
// Update parent database schemas mapping.
delete(parentDB.Schemas, sc.GetName())

// Exit early with an error if the schema is undergoing a declarative schema
// change.
if catalog.HasConcurrentDeclarativeSchemaChange(sc) {
return scerrors.ConcurrentSchemaChangeError(sc)
}

// Update the schema descriptor as dropped.
sc.SetDropped()

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (p *planner) initiateDropTable(
// Exit early with an error if the table is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if tableDesc.GetDeclarativeSchemaChangerState() != nil {
if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) {
return scerrors.ConcurrentSchemaChangeError(tableDesc)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/drop_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
Expand Down Expand Up @@ -256,6 +257,12 @@ func (p *planner) dropTypeImpl(
return errors.Errorf("type %q is already being dropped", typeDesc.Name)
}

// Exit early with an error if the type is undergoing a declarative schema
// change.
if catalog.HasConcurrentDeclarativeSchemaChange(typeDesc) {
return scerrors.ConcurrentSchemaChangeError(typeDesc)
}

// Actually mark the type as dropped.
typeDesc.SetDropped()

Expand Down
75 changes: 45 additions & 30 deletions pkg/sql/schemachanger/schemachanger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"encoding/hex"
"fmt"
"regexp"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -508,12 +509,21 @@ func TestSchemaChangeWaitsForOtherSchemaChanges(t *testing.T) {
func TestConcurrentSchemaChangesWait(t *testing.T) {
defer leaktest.AfterTest(t)()

const defaultInitialStmt = `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1`
type concurrentWaitTest struct {
// initial statement run under the declarative schema changer, paused on
// the first post commit phase.
initial string
// concurrent statement run under the legacy schema changer
concurrent string
}
ctx := context.Background()
runConcurrentSchemaChangeCase := func(t *testing.T, stmt string, implicit bool) {
runConcurrentSchemaChangeCase := func(t *testing.T, stmts concurrentWaitTest, implicit bool) {
defer log.Scope(t).Close(t)
var doOnce sync.Once
// Closed when we enter the RunBeforeBackfill knob.
beforeBackfillNotification := make(chan struct{})
// Closed when we enter the BeforeStage knob with a post commit or later
// phase.
beforePostCommitNotification := make(chan struct{})
// Closed when we're ready to continue with the schema change.
continueNotification := make(chan struct{})
// Sent on when we're waiting for the initial schema change.
Expand All @@ -538,17 +548,13 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
assert.LessOrEqual(t, int(m.MutationID()), 2)
}
s := p.Stages[idx]
if s.Type() != scop.BackfillType {
if s.Phase < scop.PostCommitPhase {
return nil
}
for _, op := range s.EdgeOps {
if _, ok := op.(*scop.BackfillIndex); ok {
doOnce.Do(func() {
close(beforeBackfillNotification)
<-continueNotification
})
}
}
doOnce.Do(func() {
close(beforePostCommitNotification)
<-continueNotification
})
return nil
},
},
Expand All @@ -567,8 +573,10 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
}
_, err = conn.ExecContext(ctx, `SET use_declarative_schema_changer = 'unsafe'`)
assert.NoError(t, err)
_, err = conn.ExecContext(ctx, `ALTER TABLE db.t ADD COLUMN b INT DEFAULT 1`)
assert.NoError(t, err)
for _, s := range strings.Split(stmts.initial, ";") {
_, err = conn.ExecContext(ctx, s)
assert.NoError(t, err)
}
return nil
}
concurrentSchemaChangeImplicit := func() error {
Expand All @@ -577,10 +585,9 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
return err
}
defer func() { _ = conn.Close() }()
for _, s := range []string{
for _, s := range append([]string{
`SET use_declarative_schema_changer = 'off'`,
stmt,
} {
}, strings.Split(stmts.concurrent, ";")...) {
if _, err = conn.ExecContext(ctx, s); err != nil {
return err
}
Expand Down Expand Up @@ -613,8 +620,10 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
if _, err := tx.Exec("SELECT * FROM db.other_t"); err != nil {
return err
}
if _, err := tx.Exec(stmt); err != nil {
return err
for _, s := range strings.Split(stmts.concurrent, ";") {
if _, err := tx.ExecContext(ctx, s); err != nil {
return err
}
}
return tx.Commit()
})
Expand All @@ -624,10 +633,15 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
tdb.Exec(t, `CREATE DATABASE db`)
tdb.Exec(t, `CREATE TABLE db.other_t (a INT PRIMARY KEY)`)
tdb.Exec(t, `CREATE TABLE db.t (a INT PRIMARY KEY)`)
tdb.Exec(t, `CREATE USER testuser`)
tdb.Exec(t, `CREATE SCHEMA db.sc`)
tdb.Exec(t, `ALTER SCHEMA db.sc OWNER to testuser`)
tdb.Exec(t, `CREATE TABLE db.sc.t (a INT PRIMARY KEY)`)
tdb.Exec(t, `ALTER TABLE db.sc.t OWNER to testuser`)
var initialSchemaChangeGroup errgroup.Group
var concurrentSchemaChangeGroup errgroup.Group
initialSchemaChangeGroup.Go(initialSchemaChange)
<-beforeBackfillNotification
<-beforePostCommitNotification
if implicit {
concurrentSchemaChangeGroup.Go(concurrentSchemaChangeImplicit)
} else {
Expand All @@ -639,19 +653,20 @@ func TestConcurrentSchemaChangesWait(t *testing.T) {
require.NoError(t, concurrentSchemaChangeGroup.Wait())
}

stmts := []string{
`ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`,
`CREATE INDEX ON db.t(a)`,
`ALTER TABLE db.t RENAME COLUMN a TO c`,
`CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`,
`CREATE VIEW db.v AS SELECT a FROM db.t`,
`ALTER TABLE db.t RENAME TO db.new`,
`TRUNCATE TABLE db.t`,
`DROP TABLE db.t`,
stmts := []concurrentWaitTest{
{defaultInitialStmt, `ALTER TABLE db.t ADD COLUMN c INT DEFAULT 2`},
{defaultInitialStmt, `CREATE INDEX ON db.t(a)`},
{defaultInitialStmt, `ALTER TABLE db.t RENAME COLUMN a TO c`},
{defaultInitialStmt, `CREATE TABLE db.t2 (i INT PRIMARY KEY, a INT REFERENCES db.t)`},
{defaultInitialStmt, `CREATE VIEW db.v AS SELECT a FROM db.t`},
{defaultInitialStmt, `ALTER TABLE db.t RENAME TO db.new`},
{defaultInitialStmt, `TRUNCATE TABLE db.t`},
{defaultInitialStmt, `DROP TABLE db.t`},
{"USE db; DROP OWNED BY testuser;", `DROP DATABASE db`},
}
for i := range stmts {
stmt := stmts[i] // copy for closure
t.Run(stmt, func(t *testing.T) {
t.Run(stmt.concurrent, func(t *testing.T) {
testutils.RunTrueAndFalse(t, "implicit", func(t *testing.T, implicit bool) {
runConcurrentSchemaChangeCase(t, stmt, implicit)
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (p *planner) createOrUpdateSchemaChangeJob(
// changer, then we must fail and wait for that schema change to conclude.
// The error here will be dealt with in
// (*connExecutor).handleWaitingForConcurrentSchemaChanges().
if tableDesc.GetDeclarativeSchemaChangerState() != nil {
if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) {
return scerrors.ConcurrentSchemaChangeError(tableDesc)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
Expand Down Expand Up @@ -176,7 +177,7 @@ func (p *planner) truncateTable(ctx context.Context, id descpb.ID, jobDesc strin
// Exit early with an error if the table is undergoing a declarative schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if tableDesc.GetDeclarativeSchemaChangerState() != nil {
if catalog.HasConcurrentDeclarativeSchemaChange(tableDesc) {
return scerrors.ConcurrentSchemaChangeError(tableDesc)
}

Expand Down

0 comments on commit 4a5a489

Please sign in to comment.