Skip to content

Commit

Permalink
sql: validate check constraints with the schema changer
Browse files Browse the repository at this point in the history
Currently, constraints are added in the `Unvalidated` state, and are not
validated for existing rows until ALTER TABLE ... VALIDATE CONSTRAINT is run.
With this change, check constraints will be validated asynchronously after they
are added by default (and similar changes to FKs are to follow). This addresses
the problematic long-running transactions caused by the current implementation
of VALIDATE CONSTRAINT. This PR is a rework of cockroachdb#32504, and has the same tests.

With this change, check constraints will be added to the table descriptor in
the new `Validating` state, visible to CRUD operations, and a mutation is
queued indicating that the constraint is to be validated. During the backfill
step, the constraint is validated for existing rows. If validation succeeds,
then the constraint moves to the `Validated` state; otherwise, it is dropped.

The behavior when dropping constraints (either via DROP CONSTRAINT or
indirectly when a column is dropped) is unchanged: no mutation is enqueued.

As part of this change, check constraints can be added to non-public columns in
the process of being added, including columns that were created earlier in the
same transaction.

The main difference between this PR and cockroachdb#32504 is that cockroachdb#32504 does not add the
constraint to the table descriptor until it has been validated.

Release note (sql change): Check constraint adds by default will validate table
data with the added constraint asynchronously after the transaction commits.
  • Loading branch information
lucy-zhang committed Jan 28, 2019
1 parent ff4db19 commit 938fb99
Show file tree
Hide file tree
Showing 22 changed files with 1,527 additions and 623 deletions.
38 changes: 22 additions & 16 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package sql

import (
"bytes"
"context"
gojson "encoding/json"
"fmt"
Expand Down Expand Up @@ -213,15 +212,23 @@ func (n *alterTableNode) startExec(params runParams) error {
}

case *tree.CheckConstraintTableDef:
// A previous command could have added a column which the new constraint uses,
// allocate IDs now.
if err != n.tableDesc.AllocateIDs() {
return err
}

ck, err := MakeCheckConstraint(params.ctx,
n.tableDesc, d, inuseNames, &params.p.semaCtx, n.n.Table)
if err != nil {
return err
}
ck.Validity = sqlbase.ConstraintValidity_Unvalidated
ck.Validity = sqlbase.ConstraintValidity_Validating
n.tableDesc.Checks = append(n.tableDesc.Checks, ck)
descriptorChanged = true

n.tableDesc.AddCheckValidationMutation(ck.Name)

case *tree.ForeignKeyConstraintTableDef:
for _, colName := range d.FromCols {
col, _, err := n.tableDesc.FindColumnByName(colName)
Expand Down Expand Up @@ -402,7 +409,11 @@ func (n *alterTableNode) startExec(params runParams) error {
for _, check := range n.tableDesc.Checks {
if used, err := check.UsesColumn(n.tableDesc.TableDesc(), col.ID); err != nil {
return err
} else if !used {
} else if used {
if check.Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("referencing constraint %q in the middle of being added, try again later", check.Name)
}
} else {
validChecks = append(validChecks, check)
}
}
Expand Down Expand Up @@ -452,6 +463,9 @@ func (n *alterTableNode) startExec(params runParams) error {
return fmt.Errorf("UNIQUE constraint depends on index %q, use DROP INDEX with CASCADE if you really want to drop it", t.Constraint)
case sqlbase.ConstraintTypeCheck:
for i := range n.tableDesc.Checks {
if n.tableDesc.Checks[i].Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("constraint %q in the middle of being added, try again later", t.Constraint)
}
if n.tableDesc.Checks[i].Name == name {
n.tableDesc.Checks = append(n.tableDesc.Checks[:i], n.tableDesc.Checks[i+1:]...)
descriptorChanged = true
Expand Down Expand Up @@ -498,6 +512,11 @@ func (n *alterTableNode) startExec(params runParams) error {
if !found {
panic("constraint returned by GetConstraintInfo not found")
}

if n.tableDesc.Checks[idx].Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("constraint %q in the middle of being added, try again later", t.Constraint)
}

ck := n.tableDesc.Checks[idx]
if err := params.p.validateCheckExpr(
params.ctx, ck.Expr, &n.n.Table, n.tableDesc.TableDesc(),
Expand Down Expand Up @@ -772,19 +791,6 @@ func applyColumnMutation(
return nil
}

func labeledRowValues(cols []sqlbase.ColumnDescriptor, values tree.Datums) string {
var s bytes.Buffer
for i := range cols {
if i != 0 {
s.WriteString(`, `)
}
s.WriteString(cols[i].Name)
s.WriteString(`=`)
s.WriteString(values[i].String())
}
return s.String()
}

// injectTableStats implements the INJECT STATISTICS command, which deletes any
// existing statistics on the table and replaces them with the statistics in the
// given json object (in the same format as the result of SHOW STATISTICS USING
Expand Down
73 changes: 72 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
// TODO(vivek): Replace these constants with a runtime budget for the
// operation chunk involved.

// checkConstraintBackfillChunkSize is the maximum number of rows
// processed per chunk during check constraint validation. This value
// is larger than the other chunk constants because the operation involves
// only running a scan and does not write.
checkConstraintBackfillChunkSize = 1600

// columnTruncateAndBackfillChunkSize is the maximum number of columns
// processed per chunk during column truncate or backfill.
columnTruncateAndBackfillChunkSize = 200
Expand Down Expand Up @@ -121,6 +127,8 @@ func (sc *SchemaChanger) runBackfill(
var droppedIndexDescs []sqlbase.IndexDescriptor
var addedIndexDescs []sqlbase.IndexDescriptor

var checksToValidate []sqlbase.DescriptorMutation_ConstraintToValidate

var tableDesc *sqlbase.TableDescriptor
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
Expand Down Expand Up @@ -152,6 +160,10 @@ func (sc *SchemaChanger) runBackfill(
}
case *sqlbase.DescriptorMutation_Index:
addedIndexDescs = append(addedIndexDescs, *t.Index)
case *sqlbase.DescriptorMutation_Constraint:
if t.Constraint.ConstraintType == sqlbase.DescriptorMutation_ConstraintToValidate_CHECK {
checksToValidate = append(checksToValidate, *t.Constraint)
}
default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -164,6 +176,8 @@ func (sc *SchemaChanger) runBackfill(
if !sc.canClearRangeForDrop(t.Index) {
droppedIndexDescs = append(droppedIndexDescs, *t.Index)
}
case *sqlbase.DescriptorMutation_Constraint:
// no-op
default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand Down Expand Up @@ -194,6 +208,12 @@ func (sc *SchemaChanger) runBackfill(
}
}

if len(checksToValidate) > 0 {
if err := sc.validateChecks(ctx, evalCtx, lease, version); err != nil {
return err
}
}

return nil
}

Expand All @@ -210,6 +230,18 @@ func (sc *SchemaChanger) getTableVersion(
return tableDesc, nil
}

func (sc *SchemaChanger) validateChecks(
ctx context.Context,
evalCtx *extendedEvalContext,
lease *sqlbase.TableDescriptor_SchemaChangeLease,
version sqlbase.DescriptorVersion,
) error {
return sc.distBackfill(
ctx, evalCtx,
lease, version, checkConstraintBackfill, checkConstraintBackfillChunkSize,
backfill.CheckMutationFilter)
}

func (sc *SchemaChanger) truncateIndexes(
ctx context.Context,
lease *sqlbase.TableDescriptor_SchemaChangeLease,
Expand Down Expand Up @@ -294,6 +326,7 @@ const (
_ backfillType = iota
columnBackfill
indexBackfill
checkConstraintBackfill
)

// getJobIDForMutationWithDescriptor returns a job id associated with a mutation given
Expand Down Expand Up @@ -534,7 +567,7 @@ func runSchemaChangesInTxn(

// Only needed because columnBackfillInTxn() backfills
// all column mutations.
doneColumnBackfill := false
doneColumnBackfill, doneCheckValidation := false, false
for _, m := range tableDesc.Mutations {
immutDesc := sqlbase.NewImmutableTableDescriptor(*tableDesc.TableDesc())
switch m.Direction {
Expand All @@ -554,6 +587,18 @@ func runSchemaChangesInTxn(
return err
}

case *sqlbase.DescriptorMutation_Constraint:
check, err := tableDesc.FindCheckByName(m.GetConstraint().Name)
if err != nil {
return err
}
if doneCheckValidation || check.Validity == sqlbase.ConstraintValidity_Validated {
break
}
if err := checkValidateInTxn(ctx, txn, evalCtx, immutDesc, traceKV); err != nil {
return err
}
doneCheckValidation = true
default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -575,6 +620,9 @@ func runSchemaChangesInTxn(
return err
}

case *sqlbase.DescriptorMutation_Constraint:
return errors.Errorf("constraint validation mutation cannot be in the DROP state within the same transaction: %+v", m)

default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -589,6 +637,29 @@ func runSchemaChangesInTxn(
return nil
}

func checkValidateInTxn(
ctx context.Context,
txn *client.Txn,
evalCtx *tree.EvalContext,
tableDesc *sqlbase.ImmutableTableDescriptor,
traceKV bool,
) error {
var backfiller backfill.CheckBackfiller
if err := backfiller.Init(evalCtx, tableDesc); err != nil {
return err
}

sp := tableDesc.PrimaryIndexSpan()
for sp.Key != nil {
var err error
sp.Key, err = backfiller.RunCheckBackfillChunk(ctx, txn, tableDesc, sp, checkConstraintBackfillChunkSize, traceKV)
if err != nil {
return err
}
}
return nil
}

// columnBackfillInTxn backfills columns for all mutation columns in
// the mutation list.
func columnBackfillInTxn(
Expand Down
Loading

0 comments on commit 938fb99

Please sign in to comment.