Skip to content

Commit

Permalink
Merge #38377 #38382
Browse files Browse the repository at this point in the history
38377: sql: fix double-adding FK backreferences when retrying r=lucy-zhang a=lucy-zhang

Currently, `PublishMultiple()` on the lease manager, which updates multiple
table descriptors in a single transaction as part of a schema change, updates
each table descriptor independently of the others. There was a bug where if the
call to `PublishMultiple()` to add FKs and backreferences was retried (e.g., if
there was a crash after this step but before the finalization of the schema
change), we would correctly avoid re-adding the reference to the table, but the
backreferences would be incorrectly added a second time.

This change updates the interface of `PublishMultiple()`: There's now a single
update closure which has access to a map of all table descriptors being
modified. Backreferences are now only installed if the forward reference was
also installed.

Release note: None

38382: sql: add support for NOT VALID check constraints r=Tyler314 a=Tyler314

Mark constraint as unvalidated if user specifies NOT VALID in their
check constraint. Within backfill, do not add the unvalidated constraints
to the queues for validating.

Release note (sql change): Support NOT VALID for check constraints,
which supports not checking constraints for existing rows.

Co-authored-by: Lucy Zhang <[email protected]>
Co-authored-by: Tyler314 <[email protected]>
  • Loading branch information
3 people committed Jun 25, 2019
3 parents e42c5da + 455b95d + a5317d3 commit fa68533
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 107 deletions.
8 changes: 6 additions & 2 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,12 @@ func (n *alterTableNode) startExec(params runParams) error {
if err != nil {
return err
}
ck.Validity = sqlbase.ConstraintValidity_Validating
n.tableDesc.AddCheckValidationMutation(ck)
if t.ValidationBehavior == tree.ValidationDefault {
ck.Validity = sqlbase.ConstraintValidity_Validating
} else {
ck.Validity = sqlbase.ConstraintValidity_Unvalidated
}
n.tableDesc.AddCheckMutation(ck)

case *tree.ForeignKeyConstraintTableDef:
for _, colName := range d.FromCols {
Expand Down
94 changes: 54 additions & 40 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (sc *SchemaChanger) runBackfill(
var droppedIndexDescs []sqlbase.IndexDescriptor
var addedIndexSpans []roachpb.Span

var constraintsToAdd []sqlbase.ConstraintToUpdate
var constraintsToAddBeforeValidation []sqlbase.ConstraintToUpdate
var constraintsToValidate []sqlbase.ConstraintToUpdate

tableDesc, err := sc.updateJobRunningStatus(ctx, RunningStatusBackfill)
Expand Down Expand Up @@ -156,8 +156,24 @@ func (sc *SchemaChanger) runBackfill(
case *sqlbase.DescriptorMutation_Index:
addedIndexSpans = append(addedIndexSpans, tableDesc.IndexSpan(t.Index.ID))
case *sqlbase.DescriptorMutation_Constraint:
constraintsToAdd = append(constraintsToAdd, *t.Constraint)
constraintsToValidate = append(constraintsToValidate, *t.Constraint)
switch t.Constraint.ConstraintType {
case sqlbase.ConstraintToUpdate_CHECK:
if t.Constraint.Check.Validity == sqlbase.ConstraintValidity_Validating {
constraintsToAddBeforeValidation = append(constraintsToAddBeforeValidation, *t.Constraint)
constraintsToValidate = append(constraintsToValidate, *t.Constraint)
}
// TODO (tyler): we do not yet support teh NOT VALID foreign keys,
// because we don't add the Foreign Key mutations
case sqlbase.ConstraintToUpdate_FOREIGN_KEY:
if t.Constraint.ForeignKey.Validity == sqlbase.ConstraintValidity_Validating {
constraintsToAddBeforeValidation = append(constraintsToAddBeforeValidation, *t.Constraint)
constraintsToValidate = append(constraintsToValidate, *t.Constraint)
}
case sqlbase.ConstraintToUpdate_NOT_NULL:
// NOT NULL constraints are always validated before they can be added
constraintsToAddBeforeValidation = append(constraintsToAddBeforeValidation, *t.Constraint)
constraintsToValidate = append(constraintsToValidate, *t.Constraint)
}
default:
return errors.AssertionFailedf(
"unsupported mutation: %+v", m)
Expand Down Expand Up @@ -218,8 +234,8 @@ func (sc *SchemaChanger) runBackfill(
// a constraint references both public and non-public columns), and 2) the
// validation occurs only when the entire cluster is already enforcing the
// constraint on insert/update.
if len(constraintsToAdd) > 0 {
if err := sc.AddConstraints(ctx, constraintsToAdd); err != nil {
if len(constraintsToAddBeforeValidation) > 0 {
if err := sc.AddConstraints(ctx, constraintsToAddBeforeValidation); err != nil {
return err
}
}
Expand All @@ -246,31 +262,39 @@ func (sc *SchemaChanger) AddConstraints(
fksByBackrefTable[c.ForeignKey.Table] = append(fksByBackrefTable[c.ForeignKey.Table], c)
}
}
tableIDsToUpdate := make([]sqlbase.ID, 0, len(fksByBackrefTable)+1)
tableIDsToUpdate = append(tableIDsToUpdate, sc.tableID)
for id := range fksByBackrefTable {
tableIDsToUpdate = append(tableIDsToUpdate, id)
}

// Create map of update closures for the table and all other tables with backreferences
updates := make(map[sqlbase.ID]func(descriptor *sqlbase.MutableTableDescriptor) error)
updates[sc.tableID] = func(desc *sqlbase.MutableTableDescriptor) error {
// Create update closure for the table and all other tables with backreferences
update := func(descs map[sqlbase.ID]*sqlbase.MutableTableDescriptor) error {
scTable, ok := descs[sc.tableID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
}
for i := range constraints {
added := &constraints[i]
switch added.ConstraintType {
constraint := &constraints[i]
switch constraint.ConstraintType {
case sqlbase.ConstraintToUpdate_CHECK, sqlbase.ConstraintToUpdate_NOT_NULL:
found := false
for _, c := range desc.Checks {
if c.Name == added.Name {
for _, c := range scTable.Checks {
if c.Name == constraint.Name {
log.VEventf(
ctx, 2,
"backfiller tried to add constraint %+v but found existing constraint %+v, presumably due to a retry",
added, c,
constraint, c,
)
found = true
break
}
}
if !found {
desc.Checks = append(desc.Checks, &constraints[i].Check)
scTable.Checks = append(scTable.Checks, &constraints[i].Check)
}
case sqlbase.ConstraintToUpdate_FOREIGN_KEY:
idx, err := desc.FindIndexByID(added.ForeignKeyIndex)
idx, err := scTable.FindIndexByID(constraint.ForeignKeyIndex)
if err != nil {
return err
}
Expand All @@ -279,39 +303,29 @@ func (sc *SchemaChanger) AddConstraints(
log.VEventf(
ctx, 2,
"backfiller tried to add constraint %+v but found existing constraint %+v, presumably due to a retry",
added, idx.ForeignKey,
constraint, idx.ForeignKey,
)
}
} else {
idx.ForeignKey = added.ForeignKey
// If there are any backreferences to be added to the same table, add them here
if added.ForeignKey.Table == sc.tableID {
backref := sqlbase.ForeignKeyReference{Table: sc.tableID, Index: added.ForeignKeyIndex}
idx, err := desc.FindIndexByID(added.ForeignKey.Index)
if err != nil {
return err
}
idx.ReferencedBy = append(idx.ReferencedBy, backref)
idx.ForeignKey = constraint.ForeignKey
// Add backreference on the referenced table (which could be the same table)
backref := sqlbase.ForeignKeyReference{Table: sc.tableID, Index: constraint.ForeignKeyIndex}
backrefTable, ok := descs[constraint.ForeignKey.Table]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
}
backrefIdx, err := backrefTable.FindIndexByID(constraint.ForeignKey.Index)
if err != nil {
return err
}
backrefIdx.ReferencedBy = append(backrefIdx.ReferencedBy, backref)
}
}
}
return nil
}
for id := range fksByBackrefTable {
updates[id] = func(desc *sqlbase.MutableTableDescriptor) error {
for _, c := range fksByBackrefTable[id] {
backref := sqlbase.ForeignKeyReference{Table: sc.tableID, Index: c.ForeignKeyIndex}
idx, err := desc.FindIndexByID(c.ForeignKey.Index)
if err != nil {
return err
}
idx.ReferencedBy = append(idx.ReferencedBy, backref)
}
return nil
}
}
if _, err := sc.leaseMgr.PublishMultiple(ctx, updates, nil); err != nil {

if _, err := sc.leaseMgr.PublishMultiple(ctx, tableIDsToUpdate, update, nil); err != nil {
return err
}
if err := sc.waitToUpdateLeases(ctx, sc.tableID); err != nil {
Expand Down Expand Up @@ -340,7 +354,7 @@ func (sc *SchemaChanger) validateConstraints(
return err
}

if fn := sc.testingKnobs.RunBeforeChecksValidation; fn != nil {
if fn := sc.testingKnobs.RunBeforeConstraintValidation; fn != nil {
if err := fn(); err != nil {
return err
}
Expand Down
59 changes: 36 additions & 23 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,18 @@ var errDidntUpdateDescriptor = errors.New("didn't update the table descriptor")
// time by first waiting for all nodes to be on the current (pre-update) version
// of the table desc.
//
// The update closure for each table ID is called after the wait, and it
// provides the new version of the descriptor to be written.
// The update closure for all tables is called after the wait. The argument to
// the closure is a map of the table descriptors with the IDs given in tableIDs,
// and the closure mutates those descriptors.
//
// The closures may be called multiple times if retries occur; make sure they do
// The closure may be called multiple times if retries occur; make sure it does
// not have side effects.
//
// Returns the updated versions of the descriptors.
func (s LeaseStore) PublishMultiple(
ctx context.Context,
updates map[sqlbase.ID]func(descriptor *sqlbase.MutableTableDescriptor) error,
tableIDs []sqlbase.ID,
update func(map[sqlbase.ID]*sqlbase.MutableTableDescriptor) error,
logEvent func(*client.Txn) error,
) (map[sqlbase.ID]*sqlbase.ImmutableTableDescriptor, error) {
errLeaseVersionChanged := errors.New("lease version changed")
Expand All @@ -355,7 +357,7 @@ func (s LeaseStore) PublishMultiple(
// Wait until there are no unexpired leases on the previous versions
// of the tables.
expectedVersions := make(map[sqlbase.ID]sqlbase.DescriptorVersion)
for id := range updates {
for _, id := range tableIDs {
expected, err := s.WaitForOneVersion(ctx, id, base.DefaultRetryOptions())
if err != nil {
return nil, err
Expand All @@ -367,45 +369,50 @@ func (s LeaseStore) PublishMultiple(
// There should be only one version of the descriptor, but it's
// a race now to update to the next version.
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
for id, update := range updates {
versions := make(map[sqlbase.ID]sqlbase.DescriptorVersion)
descsToUpdate := make(map[sqlbase.ID]*sqlbase.MutableTableDescriptor)
for _, id := range tableIDs {
// Re-read the current versions of the table descriptor, this time
// transactionally.
var err error
tableDesc, err := sqlbase.GetMutableTableDescFromID(ctx, txn, id)
descsToUpdate[id], err = sqlbase.GetMutableTableDescFromID(ctx, txn, id)
if err != nil {
return err
}

if expectedVersions[id] != tableDesc.Version {
if expectedVersions[id] != descsToUpdate[id].Version {
// The version changed out from under us. Someone else must be
// performing a schema change operation.
if log.V(3) {
log.Infof(ctx, "publish (version changed): %d != %d", expectedVersions[id], tableDesc.Version)
log.Infof(ctx, "publish (version changed): %d != %d", expectedVersions[id], descsToUpdate[id].Version)
}
return errLeaseVersionChanged
}

// Run the update closure.
version := tableDesc.Version
if err := update(tableDesc); err != nil {
return err
}
if version != tableDesc.Version {
versions[id] = descsToUpdate[id].Version
}

// Run the update closure.
if err := update(descsToUpdate); err != nil {
return err
}
for _, id := range tableIDs {
if versions[id] != descsToUpdate[id].Version {
return errors.Errorf("updated version to: %d, expected: %d",
tableDesc.Version, version)
descsToUpdate[id].Version, versions[id])
}

if err := tableDesc.MaybeIncrementVersion(ctx, txn); err != nil {
if err := descsToUpdate[id].MaybeIncrementVersion(ctx, txn); err != nil {
return err
}
if err := tableDesc.ValidateTable(s.settings); err != nil {
if err := descsToUpdate[id].ValidateTable(s.settings); err != nil {
return err
}

tableDescs[id] = tableDesc
tableDescs[id] = descsToUpdate[id]
}

// Write the updated descriptor.
// Write the updated descriptors.
if err := txn.SetSystemConfigTrigger(); err != nil {
return err
}
Expand Down Expand Up @@ -468,10 +475,16 @@ func (s LeaseStore) Publish(
update func(*sqlbase.MutableTableDescriptor) error,
logEvent func(*client.Txn) error,
) (*sqlbase.ImmutableTableDescriptor, error) {
updates := make(map[sqlbase.ID]func(descriptor *sqlbase.MutableTableDescriptor) error)
updates[tableID] = update
tableIDs := []sqlbase.ID{tableID}
updates := func(descs map[sqlbase.ID]*sqlbase.MutableTableDescriptor) error {
desc, ok := descs[tableID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", tableID)
}
return update(desc)
}

results, err := s.PublishMultiple(ctx, updates, logEvent)
results, err := s.PublishMultiple(ctx, tableIDs, updates, logEvent)
if err != nil {
return nil, err
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -972,3 +972,43 @@ t a_auto_not_null1 CHECK CHECK ((a IS NOT NULL)) true

statement ok
DROP TABLE t

# Check for adding constraints NOT VALID
statement ok
CREATE TABLE t (a int);

statement ok
INSERT INTO t VALUES (10), (15), (17)

statement error pq: validation of CHECK "a < 16" failed on row: a=17
ALTER TABLE t ADD CHECK (a < 16)

statement ok
ALTER TABLE t ADD CHECK (a < 100)

statement ok
ALTER TABLE t ADD CHECK (a < 16) NOT VALID

query TTTTB
SHOW CONSTRAINTS FROM t
----
t check_a CHECK CHECK ((a < 100)) true
t check_a1 CHECK CHECK ((a < 16)) false

query error pq: failed to satisfy CHECK constraint \(a < 16\)
INSERT INTO t VALUES (20)

statement error pq: validation of CHECK "a < 16" failed on row: a=17
ALTER TABLE t VALIDATE CONSTRAINT check_a1

statement ok
DELETE FROM t WHERE a = 17

statement ok
ALTER TABLE t VALIDATE CONSTRAINT check_a1

query TTTTB
SHOW CONSTRAINTS FROM t
----
t check_a CHECK CHECK ((a < 100)) true
t check_a1 CHECK CHECK ((a < 16)) true
Loading

0 comments on commit fa68533

Please sign in to comment.