Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
34301: sql: validate check constraints with the schema changer r=lucy-zhang a=lucy-zhang

Currently, constraints are added in the `Unvalidated` state, and are not
validated for existing rows until ALTER TABLE ... VALIDATE CONSTRAINT is run.
With this change, check constraints will be validated asynchronously after they
are added by default (and similar changes to FKs are to follow). This addresses
the problematic long-running transactions caused by the current implementation
of VALIDATE CONSTRAINT. This PR is a rework of #32504 and has the same tests.

With this change, check constraints will be added to the table descriptor in
the new `Validating` state, visible to CRUD operations, and a mutation is
queued indicating that the constraint is to be validated. During the backfill
step, the constraint is validated for existing rows. If validation succeeds,
then the constraint moves to the `Validated` state; otherwise, it is dropped.

The behavior when dropping constraints (either via DROP CONSTRAINT or
indirectly when a column is dropped) is unchanged: no mutation is enqueued.

As part of this change, check constraints can be added to non-public columns in
the process of being added, including columns that were created earlier in the
same transaction.

The main difference between this PR and #32504 is that #32504 does not add the
constraint to the table descriptor until it has been validated.

See #34238 for more context.

Release note (sql change): Check constraint adds by default will validate table
data with the added constraint asynchronously after the transaction commits.

34720: rpc: always trace incoming RPCs if tracing is enabled r=andreimatei a=andreimatei

Before this patch, an incoming RPC would not be traced if the caller
wasn't traced. Usually this was inconsequential, as the caller is
generally traced if tracing is enabled in various ways. However, for the
status server (AdminUI calls) this was not true - the caller (the
browser, through a HTTP->gRPC gateway) was never tracing a call.
This patch makes the server create a span regardless of the caller if
tracing is enabled.

Fixes #34310

Release note: None

34798: roachtest: add two more indexes to bulk index creation test r=vivekmenezes a=vivekmenezes

Release note: None

34829: jobs: only include running and very recently finished jobs in SHOW JOBS r=dt a=dt

On a cluster that has been running for a long time or with frequent periodic jobs, SHOW JOBS can output an unbounded, massive wall of text.
This makes it hard to find the jobs you are likely interested in -- those that are running or have very recently finished.

This changes SHOW JOBS to only include running jobs or those that finished in the last 12h.
The full listing of jobs is still available via crdb_internal.jobs.

Release note (general change): SHOW JOBS only returns running and recently finished jobs. Older jobs can still be inspected via the crdb_internal.jobs table.

Co-authored-by: Lucy Zhang <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Vivek Menezes <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
5 people committed Feb 12, 2019
5 parents b8e9890 + baa598c + 88ff817 + 9b054c0 + ef769e0 commit 5438cdb
Show file tree
Hide file tree
Showing 22 changed files with 1,220 additions and 326 deletions.
12 changes: 9 additions & 3 deletions pkg/cmd/roachtest/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,15 @@ SET CLUSTER SETTING schemachanger.bulk_index_backfill.enabled = true
}
db.Close()
}
if bulkInsert {
return runAndLogStmts(ctx, t, c, "addindex", []string{
`CREATE UNIQUE INDEX ON tpcc.order (o_entry_d, o_w_id, o_d_id, o_carrier_id, o_id);`,
`CREATE INDEX ON tpcc.order (o_carrier_id);`,
`CREATE INDEX ON tpcc.customer (c_last, c_first);`,
})
}
return runAndLogStmts(ctx, t, c, "addindex", []string{
`CREATE UNIQUE INDEX ON tpcc.order (o_entry_d, o_w_id, o_d_id, o_carrier_id, o_id);`,
// TODO(vivek): Enable these once ADD INDEX performance has improved.
// `CREATE INDEX ON tpcc.order (o_carrier_id);`,
// `CREATE INDEX ON tpcc.customer (c_last, c_first);`,
})
},
Duration: length,
Expand Down Expand Up @@ -463,6 +467,8 @@ func runAndLogStmts(ctx context.Context, t *test, c *cluster, prefix string, stm
c.l.Printf("%s: running %d statements\n", prefix, len(stmts))
start := timeutil.Now()
for i, stmt := range stmts {
// Let some traffic run before the schema change.
time.Sleep(time.Minute)
c.l.Printf("%s: running statement %d...\n", prefix, i+1)
before := timeutil.Now()
if _, err := db.Exec(stmt); err != nil {
Expand Down
32 changes: 29 additions & 3 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,24 @@ var sourceAddr = func() net.Addr {

var enableRPCCompression = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_RPC_COMPRESSION", true)

func spanInclusionFunc(
// spanInclusionFuncForServer is used as a SpanInclusionFunc for the server-side
// of RPCs, deciding for which operations the gRPC opentracing interceptor should
// create a span.
func spanInclusionFuncForServer(
t *tracing.Tracer, parentSpanCtx opentracing.SpanContext, method string, req, resp interface{},
) bool {
// Is client tracing?
return (parentSpanCtx != nil && !tracing.IsNoopContext(parentSpanCtx)) ||
// Should we trace regardless of the client? This is useful for calls coming
// through the HTTP->RPC gateway (i.e. the AdminUI), where client is never
// tracing.
t.AlwaysTrace()
}

// spanInclusionFuncForClient is used as a SpanInclusionFunc for the client-side
// of RPCs, deciding for which operations the gRPC opentracing interceptor should
// create a span.
func spanInclusionFuncForClient(
parentSpanCtx opentracing.SpanContext, method string, req, resp interface{},
) bool {
return parentSpanCtx != nil && !tracing.IsNoopContext(parentSpanCtx)
Expand Down Expand Up @@ -172,7 +189,16 @@ func NewServerWithInterceptor(
// tracing is disabled.
unaryInterceptor = otgrpc.OpenTracingServerInterceptor(
tracer,
otgrpc.IncludingSpans(otgrpc.SpanInclusionFunc(spanInclusionFunc)),
otgrpc.IncludingSpans(otgrpc.SpanInclusionFunc(
func(
parentSpanCtx opentracing.SpanContext,
method string,
req, resp interface{}) bool {
// This anonymous func serves to bind the tracer for
// spanInclusionFuncForServer.
return spanInclusionFuncForServer(
tracer.(*tracing.Tracer), parentSpanCtx, method, req, resp)
})),
)
// TODO(tschottdorf): should set up tracing for stream-based RPCs as
// well. The otgrpc package has no such facility, but there's also this:
Expand Down Expand Up @@ -574,7 +600,7 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) {
// the number of packets (even with an empty context!). See #17177.
interceptor := otgrpc.OpenTracingClientInterceptor(
tracer,
otgrpc.IncludingSpans(otgrpc.SpanInclusionFunc(spanInclusionFunc)),
otgrpc.IncludingSpans(otgrpc.SpanInclusionFunc(spanInclusionFuncForClient)),
)
dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(interceptor))
}
Expand Down
26 changes: 23 additions & 3 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,23 @@ func (n *alterTableNode) startExec(params runParams) error {
}

case *tree.CheckConstraintTableDef:
// A previous command could have added a column which the new constraint uses,
// allocate IDs now.
if err != n.tableDesc.AllocateIDs() {
return err
}

ck, err := MakeCheckConstraint(params.ctx,
n.tableDesc, d, inuseNames, &params.p.semaCtx, n.n.Table)
if err != nil {
return err
}
ck.Validity = sqlbase.ConstraintValidity_Unvalidated
ck.Validity = sqlbase.ConstraintValidity_Validating
n.tableDesc.Checks = append(n.tableDesc.Checks, ck)
descriptorChanged = true

n.tableDesc.AddCheckValidationMutation(ck.Name)

case *tree.ForeignKeyConstraintTableDef:
for _, colName := range d.FromCols {
col, _, err := n.tableDesc.FindColumnByName(colName)
Expand Down Expand Up @@ -402,7 +410,11 @@ func (n *alterTableNode) startExec(params runParams) error {
for _, check := range n.tableDesc.Checks {
if used, err := check.UsesColumn(n.tableDesc.TableDesc(), col.ID); err != nil {
return err
} else if !used {
} else if used {
if check.Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("referencing constraint %q in the middle of being added, try again later", check.Name)
}
} else {
validChecks = append(validChecks, check)
}
}
Expand Down Expand Up @@ -452,6 +464,9 @@ func (n *alterTableNode) startExec(params runParams) error {
return fmt.Errorf("UNIQUE constraint depends on index %q, use DROP INDEX with CASCADE if you really want to drop it", t.Constraint)
case sqlbase.ConstraintTypeCheck:
for i := range n.tableDesc.Checks {
if n.tableDesc.Checks[i].Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("constraint %q in the middle of being added, try again later", t.Constraint)
}
if n.tableDesc.Checks[i].Name == name {
n.tableDesc.Checks = append(n.tableDesc.Checks[:i], n.tableDesc.Checks[i+1:]...)
descriptorChanged = true
Expand Down Expand Up @@ -498,9 +513,14 @@ func (n *alterTableNode) startExec(params runParams) error {
if !found {
panic("constraint returned by GetConstraintInfo not found")
}

if n.tableDesc.Checks[idx].Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("constraint %q in the middle of being added, try again later", t.Constraint)
}

ck := n.tableDesc.Checks[idx]
if err := validateCheckExpr(
params.ctx, ck.Expr, &n.n.Table, n.tableDesc.TableDesc(), params.EvalContext(),
params.ctx, ck.Expr, n.tableDesc.TableDesc(), params.EvalContext().InternalExecutor, params.EvalContext().Txn,
); err != nil {
return err
}
Expand Down
150 changes: 149 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (sc *SchemaChanger) runBackfill(
var droppedIndexDescs []sqlbase.IndexDescriptor
var addedIndexDescs []sqlbase.IndexDescriptor

var checksToValidate []sqlbase.ConstraintToValidate

var tableDesc *sqlbase.TableDescriptor
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
Expand Down Expand Up @@ -156,6 +158,13 @@ func (sc *SchemaChanger) runBackfill(
}
case *sqlbase.DescriptorMutation_Index:
addedIndexDescs = append(addedIndexDescs, *t.Index)
case *sqlbase.DescriptorMutation_Constraint:
switch t.Constraint.ConstraintType {
case sqlbase.ConstraintToValidate_CHECK:
checksToValidate = append(checksToValidate, *t.Constraint)
default:
return errors.Errorf("unsupported constraint type: %d", t.Constraint.ConstraintType)
}
default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -168,6 +177,8 @@ func (sc *SchemaChanger) runBackfill(
if !sc.canClearRangeForDrop(t.Index) {
droppedIndexDescs = append(droppedIndexDescs, *t.Index)
}
case *sqlbase.DescriptorMutation_Constraint:
// no-op
default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand Down Expand Up @@ -198,9 +209,95 @@ func (sc *SchemaChanger) runBackfill(
}
}

// Validate check constraints.
if len(checksToValidate) > 0 {
if err := sc.validateChecks(ctx, evalCtx, lease, checksToValidate); err != nil {
return err
}
}
return nil
}

func (sc *SchemaChanger) validateChecks(
ctx context.Context,
evalCtx *extendedEvalContext,
lease *sqlbase.TableDescriptor_SchemaChangeLease,
checks []sqlbase.ConstraintToValidate,
) error {
if testDisableTableLeases {
return nil
}
readAsOf := sc.clock.Now()
return sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.SetFixedTimestamp(ctx, readAsOf)
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, sc.tableID)
if err != nil {
return err
}

if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

grp := ctxgroup.WithContext(ctx)

// Notify when validation is finished (or has returned an error) for a check.
countDone := make(chan struct{}, len(checks))

for _, c := range checks {
grp.GoCtx(func(ctx context.Context) error {
defer func() { countDone <- struct{}{} }()

// Make the mutations public in a private copy of the descriptor
// and add it to the TableCollection, so that we can use SQL below to perform
// the validation. We wouldn't have needed to do this if we could have
// updated the descriptor and run validation in the same transaction. However,
// our current system is incapable of running long running schema changes
// (the validation can take many minutes). So we pretend that the schema
// has been updated and actually update it in a separate transaction that
// follows this one.
desc, err := sqlbase.NewImmutableTableDescriptor(*tableDesc).MakeFirstMutationPublic()
if err != nil {
return err
}
// Create a new eval context only because the eval context cannot be shared across many
// goroutines.
newEvalCtx := createSchemaChangeEvalCtx(ctx, readAsOf, evalCtx.Tracing, sc.ieFactory)
return validateCheckInTxn(ctx, sc.leaseMgr, &newEvalCtx.EvalContext, desc, txn, &c.Name)
})
}

// Periodic schema change lease extension.
grp.GoCtx(func(ctx context.Context) error {
count := len(checks)
refreshTimer := timeutil.NewTimer()
defer refreshTimer.Stop()
refreshTimer.Reset(checkpointInterval)
for {
select {
case <-countDone:
count--
if count == 0 {
// Stop.
return nil
}

case <-refreshTimer.C:
refreshTimer.Read = true
refreshTimer.Reset(checkpointInterval)
if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

case <-ctx.Done():
return ctx.Err()
}
}
})
return grp.Wait()
})
}

func (sc *SchemaChanger) getTableVersion(
ctx context.Context, txn *client.Txn, tc *TableCollection, version sqlbase.DescriptorVersion,
) (*sqlbase.ImmutableTableDescriptor, error) {
Expand Down Expand Up @@ -701,11 +798,14 @@ func runSchemaChangesInTxn(
// Only needed because columnBackfillInTxn() backfills
// all column mutations.
doneColumnBackfill := false
// Checks are validated after all other mutations have been applied.
var checksToValidate []sqlbase.ConstraintToValidate

for _, m := range tableDesc.Mutations {
immutDesc := sqlbase.NewImmutableTableDescriptor(*tableDesc.TableDesc())
switch m.Direction {
case sqlbase.DescriptorMutation_ADD:
switch m.Descriptor_.(type) {
switch t := m.Descriptor_.(type) {
case *sqlbase.DescriptorMutation_Column:
if doneColumnBackfill || !sqlbase.ColumnNeedsBackfill(m.GetColumn()) {
break
Expand All @@ -720,6 +820,14 @@ func runSchemaChangesInTxn(
return err
}

case *sqlbase.DescriptorMutation_Constraint:
switch t.Constraint.ConstraintType {
case sqlbase.ConstraintToValidate_CHECK:
checksToValidate = append(checksToValidate, *t.Constraint)
default:
return errors.Errorf("unsupported constraint type: %d", t.Constraint.ConstraintType)
}

default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -741,6 +849,9 @@ func runSchemaChangesInTxn(
return err
}

case *sqlbase.DescriptorMutation_Constraint:
return errors.Errorf("constraint validation mutation cannot be in the DROP state within the same transaction: %+v", m)

default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -752,9 +863,46 @@ func runSchemaChangesInTxn(
}
tableDesc.Mutations = nil

// Now that the table descriptor is in a valid state with all column and index
// mutations applied, it can be used for validating check constraints
for _, c := range checksToValidate {
if err := validateCheckInTxn(ctx, tc.leaseMgr, evalCtx, tableDesc, txn, &c.Name); err != nil {
return err
}
}
return nil
}

// validateCheckInTxn validates check constraints within the provided
// transaction. The table descriptor that is passed in will be used for the
// InternalExecutor that performs the validation query.
func validateCheckInTxn(
ctx context.Context,
leaseMgr *LeaseManager,
evalCtx *tree.EvalContext,
tableDesc *MutableTableDescriptor,
txn *client.Txn,
checkName *string,
) error {
newTc := &TableCollection{leaseMgr: leaseMgr}
// pretend that the schema has been modified.
if err := newTc.addUncommittedTable(*tableDesc); err != nil {
return err
}

ie := evalCtx.InternalExecutor.(*SessionBoundInternalExecutor)
ie.impl.tcModifier = newTc
defer func() {
ie.impl.tcModifier = nil
}()

check, err := tableDesc.FindCheckByName(*checkName)
if err != nil {
return err
}
return validateCheckExpr(ctx, check.Expr, tableDesc.TableDesc(), ie, txn)
}

// columnBackfillInTxn backfills columns for all mutation columns in
// the mutation list.
func columnBackfillInTxn(
Expand Down
Loading

0 comments on commit 5438cdb

Please sign in to comment.